API 数据拉取脚本?

wen 实用脚本 50

本文目录导读:

API 数据拉取脚本?

  1. Python版本(最常用)
  2. Node.js版本
  3. Shell脚本版本(使用curl)
  4. 安装依赖
  5. 使用建议

我来帮你写一个通用的API数据拉取脚本,这里提供不同语言的版本:

Python版本(最常用)

基础版 - 使用requests库

import requests
import json
import time
from typing import Dict, List, Any
def fetch_api_data(url: str, method: str = 'GET', headers: Dict = None, 
                   params: Dict = None, data: Dict = None) -> Dict:
    """
    通用API数据拉取函数
    Args:
        url: API地址
        method: 请求方法 (GET/POST/PUT/DELETE)
        headers: 请求头
        params: URL参数
        data: POST请求体数据
    Returns:
        API返回的JSON数据
    """
    try:
        if method.upper() == 'GET':
            response = requests.get(url, headers=headers, params=params, timeout=10)
        elif method.upper() == 'POST':
            response = requests.post(url, headers=headers, json=data, timeout=10)
        elif method.upper() == 'PUT':
            response = requests.put(url, headers=headers, json=data, timeout=10)
        elif method.upper() == 'DELETE':
            response = requests.delete(url, headers=headers, timeout=10)
        else:
            raise ValueError(f"不支持的请求方法: {method}")
        response.raise_for_status()  # 检查HTTP错误
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"请求失败: {e}")
        return None
# 使用示例
if __name__ == "__main__":
    # 示例1: 简单的GET请求
    url = "https://api.example.com/users"
    headers = {
        "Authorization": "Bearer your_token_here",
        "Content-Type": "application/json"
    }
    params = {"page": 1, "limit": 10}
    data = fetch_api_data(url, headers=headers, params=params)
    if data:
        print(json.dumps(data, indent=2, ensure_ascii=False))

带分页的高级版

import requests
import time
from typing import Generator, Dict, Any
class APIClient:
    """API客户端,支持分页、重试、速率限制"""
    def __init__(self, base_url: str, headers: Dict = None, 
                 max_retries: int = 3, rate_limit: float = 1.0):
        self.base_url = base_url
        self.headers = headers or {}
        self.max_retries = max_retries
        self.rate_limit = rate_limit  # 请求间隔(秒)
        self.session = requests.Session()
        self.last_request_time = 0
    def _wait_for_rate_limit(self):
        """控制请求频率"""
        elapsed = time.time() - self.last_request_time
        if elapsed < self.rate_limit:
            time.sleep(self.rate_limit - elapsed)
        self.last_request_time = time.time()
    def _request(self, endpoint: str, method: str = 'GET', **kwargs) -> Dict:
        """发送请求并处理重试"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        for attempt in range(self.max_retries):
            try:
                self._wait_for_rate_limit()
                if method.upper() == 'GET':
                    response = self.session.get(url, headers=self.headers, **kwargs)
                elif method.upper() == 'POST':
                    response = self.session.post(url, headers=self.headers, **kwargs)
                else:
                    raise ValueError(f"不支持的方法: {method}")
                response.raise_for_status()
                return response.json()
            except requests.exceptions.RequestException as e:
                if attempt == self.max_retries - 1:
                    raise  # 最后一次重试失败,抛出异常
                print(f"请求失败,重试中... (尝试 {attempt + 1}/{self.max_retries})")
                time.sleep(2 ** attempt)  # 指数退避
    def get_paginated_data(self, endpoint: str, params: Dict = None, 
                          page_key: str = 'page', 
                          data_key: str = 'data') -> Generator[Dict, None, None]:
        """
        获取分页数据
        Args:
            endpoint: API端点
            params: 请求参数
            page_key: 分页参数名
            data_key: 数据在响应中的键名
        """
        params = params or {}
        page = 1
        total_pages = None
        while total_pages is None or page <= total_pages:
            params[page_key] = page
            response = self._request(endpoint, params=params)
            if total_pages is None:
                # 尝试获取总页数,不同API可能有不同的字段名
                total_pages = response.get('total_pages') or \
                             response.get('totalPages') or \
                             response.get('pages') or 1
            # 提取数据
            data = response.get(data_key) or response.get('results') or response.get('items')
            if data:
                for item in data:
                    yield item
            page += 1
# 使用示例
if __name__ == "__main__":
    client = APIClient(
        base_url="https://api.example.com",
        headers={"Authorization": "Bearer your_token"},
        rate_limit=0.5  # 每秒最多2个请求
    )
    # 获取所有用户数据
    for user in client.get_paginated_data(
        endpoint="users",
        params={"limit": 50},
        page_key="page",
        data_key="users"
    ):
        print(f"用户: {user['name']} - {user['email']}")

Node.js版本

// 使用 axios
const axios = require('axios');
async function fetchAPIData(url, options = {}) {
    try {
        const {
            method = 'GET',
            headers = {},
            params = {},
            data = {}
        } = options;
        const config = {
            method: method.toLowerCase(),
            url,
            headers: {
                'Content-Type': 'application/json',
                ...headers
            },
            timeout: 10000
        };
        if (method.toUpperCase() === 'GET') {
            config.params = params;
        } else {
            config.data = data;
        }
        const response = await axios(config);
        return response.data;
    } catch (error) {
        if (error.response) {
            console.error(`HTTP错误: ${error.response.status}`);
            console.error('响应数据:', error.response.data);
        } else if (error.request) {
            console.error('无响应:', error.message);
        } else {
            console.error('请求配置错误:', error.message);
        }
        return null;
    }
}
// 使用示例
async function main() {
    const url = 'https://api.example.com/users';
    const options = {
        headers: {
            'Authorization': 'Bearer your_token_here'
        },
        params: {
            page: 1,
            limit: 10
        }
    };
    const data = await fetchAPIData(url, options);
    if (data) {
        console.log(JSON.stringify(data, null, 2));
    }
}
main();

Shell脚本版本(使用curl)

#!/bin/bash
# API配置
API_URL="https://api.example.com"
API_TOKEN="your_token_here"
# 函数:发送API请求
fetch_api() {
    local endpoint=$1
    local method=${2:-GET}
    local data=${3:-}
    local url="${API_URL}/${endpoint}"
    local headers=(
        "-H" "Authorization: Bearer ${API_TOKEN}"
        "-H" "Content-Type: application/json"
    )
    if [ "$method" = "GET" ]; then
        curl -s "${headers[@]}" "$url"
    else
        curl -s -X "$method" "${headers[@]}" -d "$data" "$url"
    fi
}
# 使用示例
echo "获取用户数据..."
fetch_api "users?page=1&limit=10"

安装依赖

Python:

pip install requests

Node.js:

npm install axios
# 或者
yarn add axios

使用建议

  1. 错误处理:始终添加try-catch/finally
  2. 重试机制:对重要请求实现指数退避重试
  3. 速率限制:遵守API的调用频率限制
  4. 日志记录:记录请求和响应,便于调试
  5. 缓存:对不频繁变化的数据实现缓存
  6. 配置管理:将URL、Token等配置放在环境变量中

需要根据具体的API文档调整参数和响应处理方式。

抱歉,评论功能暂时关闭!