python脚本实现:获取光猫公网ip 阿里云ddns域名解析

这个脚本的python脚本初衷是因为在家里搭建了一个Centos服务器,和一个Nas服务器。为了远程访问家里的服务器,下载文件,特别向电信申请了公网ip,不过现在找电信客服申请公网IP地址越来越难了,有的客服直接说没有公网IP地址这个业务了,叫你申请专线。

怎么可能申请专线,专线那玩意不是一般人用得起的。

问题:

电信为了限制个人用于部署服务,尽管会分配公网ip,但是ip经常会改变。

于是又去阿里云去购买了一个域名,想将本地公网ip动态解析到域名,尽管会有域名生效的间隔期,但是也能基本满足日常的需要了。

软路由市场中有直接支持阿里ddns设置的插件,感觉别人写的,总是不太放心。

于是自己造轮子:查阅资料,通过python自己来实现。

整体思路:

获取自己的公网ip,域名解析ip是否一致

如果一致,等待一段时间,循环重复

如果不一致,调用阿里云接口设置,并等待生效。

 

关于公网ip的获取方式,采用了好多种尝试:

方案一:通过访问一些网页,从中抓取自己的公网ip(网上教程大多数都是这种)

弊端,网页服务不稳定,有可能挂掉,有可能

有可能返回错误ip,特别是使用软路由的情况下。

为了解决这些问题,采用多个网站获取ip,并取交集的方式来减小错误几率。

网上有很多代码,我的代码已经不在了,需要的自行百度即可找到。

 

方案二:用软路由拨号,通过ssh客户端返回wan口ip:

需要使用到python的 paramiko库,直接上代码:

ssh_paras = {
    "hostname": \'192.168.2.1\',
    "port": 9999,
    "username": \'root\',
    "password": \'***********\'
}

import paramiko

def get_ssh_client():
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(**ssh_paras)
    return ssh


def get_current_wan_ip():
    cmd = \'ifconfig pppoe-WAN\'
    ssh_client = get_ssh_client()
    _, stdout, _ = ssh_client.exec_command(cmd)
    result = stdout.read()
    ssh_client.close()
    if result:
        result = result.decode("UTF-8")
        if find := re.search(r"inet addr:(\d+\.\d+\.\d+\.\d+)", result):
            ip = find.group(1)
            return ip

 

方案三:用光猫拨号,通过requests库,模拟登录,通过http返回结果解析;

通过web登录,点击操作获取wan口公网ip, 通过Wireshark抓包,分析http登录流程机制。

注意不同的路由器,光猫可能登录流程不一致,但思路是雷同的。(我的光猫的是华为的MA5671)

COOKIE = None


def get_wan_ip():
    global COOKIE
    while True:
        if COOKIE is None:
            COOKIE = get_cookie()
        t = time.time()
        res3 = requests.get("http://192.168.1.1/html/bbsp/common/wan_list.asp",
                            headers={
                                "DNT": "1",
                                "Accept": "*/*",
                                "Cookie": "Cookie=" + COOKIE,
                            })
        text = res3.text or ""
        wan_ip_find = re.search("INTERNET_R_VID.*?\"(\d+\.\d+\.\d+\.\d+)\"", text)
        if wan_ip_find:
            print(f"used time {time.time() - t}")
            return wan_ip_find.group(1)
        else:
            time.sleep(1) 
            print(time.asctime(), " cookie is outof time!")

def get_cookie():
    
    res1 = requests.post("http://192.168.1.1/asp/GetRandCount.asp")
    rand = res1.text[3:]

    data = {"x.X_HW_Token": rand}
    headers = {
        "Cookie": "Cookie=UserName:<****>:PassWord:<****>:Language:chinese:id=-1",
        "Content-Type": "application/x-www-form-urlencoded"
    }
  # <****> 代表账号和密码,需要注意密码是转码之后的形式,通过抓包工具可以看到自己的。
    res2 = requests.post("http://192.168.1.1/login.cgi", data, headers=headers)
    cookie = res2.cookies.get("Cookie")
    return cookie

 

方案四:通过telnet登录光猫,通过telnet交互获取wan口公网ip:

方案三中通过requests库模拟web登录,会造成登录的用户过多,

然后就无法通过浏览器进入光猫了(我的模拟代码太简陋,没有考虑推出登录的代码实现)

并且返回wan口公网ip的同时,会返回很多无用信息,每次执行比较耗时,浪费光猫资源。

所以想通过ssh登录光猫,去获取密码,发现这款光猫里面却没有内置ssh服务。

发现有telnet服务,于是通过telnet实现公网ip的获取:

注意:设备支持哪些telnet命令,可以通过“?”命令查询,自己在根据命令猜测其作用。

实现代码:

需要注意不同的设备,telnet的截取关键字儿可能不同,需要自己根据设备修改。

# coding=UTF-8


import re
import time
import telnetlib


def get_wan_ip():
    while True:
        try:
            tel = telnetlib.Telnet("192.168.1.1", port=23)
            tel.read_until(b"Login:")
            tel.write(b"root\n")
            tel.read_until(b"Password:")
            tel.write(b"youpassword\n")
            tel.read_until(b"WAP>")
        except:
            time.sleep(1)
            continue
        else:
            while True:
                try:
                    tel.write(b"ip route\n")
                    data = tel.read_until(b"WAP>").decode()
                    if ip_find:= re.search("ppp.*?src (\d+\.\d+\.\d+\.\d+)", data):
                        yield ip_find.group(1)
                except:
                    time.sleep(1)
                    break
                
for i in get_wan_ip():
    print(i)
    time.sleep(1)

最后:附上通过aliyun python ddns接口实现域名动态解析的代码。

  • requirements.txt
  • aliyun-python-sdk-core==2.13.35
  • aliyun-python-sdk-alidns==2.6.32
  • aliyun-python-sdk-domain==3.14.6
  • dnspython==2.1.0
  • paramiko==2.8.0
# coding=UTF-8
import re
import json
import time
import telnetlib
from aliyunsdkcore.client import AcsClient
from aliyunsdkalidns.request.v20150109.DescribeDomainRecordsRequest import DescribeDomainRecordsRequest
from aliyunsdkalidns.request.v20150109.UpdateDomainRecordRequest import UpdateDomainRecordRequest

import dns.resolver


aliyun_id = "wkelwXModwxPOIBD假的YTEEFDDD0DDDD"
aliyun_secret = "ADDFDSDEEEFD错的dCCCCxclaldoerC"
aliyun_account = "aliyun账号名"# 以上信息需要登录阿里云官网去申请,是通过这个就能访问阿里云里你的账户,设置你的购买的域名解析。
domain_lst = ["baidu.com", "你购买的域名", ]

def get_wan_ip():
    while True:
        try:
            tel = telnetlib.Telnet("192.168.1.1", port=23)
            tel.read_until(b"Login:")
            tel.write(b"root\n")
            tel.read_until(b"Password:")
            tel.write(b"000000000\n")
            tel.read_until(b"WAP>")
        except:
            time.sleep(1)
            continue
        else:
            while True:
                try:
                    tel.write(b"ip route\n")
                    data = tel.read_until(b"WAP>").decode()
                    if ip_find := re.search("ppp.*?src (\d+\.\d+\.\d+\.\d+)", data):
                        yield ip_find.group(1)
                except:
                    time.sleep(1)
                    break


def set_dns_ip(ip):
    client = AcsClient(aliyun_id, aliyun_secret)
    for domain in domain_lst:
        request = DescribeDomainRecordsRequest()
        request.set_accept_format('json')
        request.set_DomainName(domain)
        response = client.do_action_with_exception(request)
        json_data = json.loads(str(response, encoding='utf-8'))
        for record in json_data["DomainRecords"]["Record"]:
            if record["Value"] == ip:
                continue
            request = UpdateDomainRecordRequest()
            request.set_accept_format('json')
            request.set_Value(ip)
            request.set_Type(record["Type"])
            request.set_TTL(600)
            request.set_RR(record["RR"])
            request.set_RecordId(record["RecordId"])
            client.do_action_with_exception(request)


def get_domain_ip(domain):
    resolver = dns.resolver.Resolver()
    resolver.nameservers = ['61.139.2.69', '218.6.200.139', '202.98.96.68']
    return resolver.resolve(domain, 'A')[0].to_text()


def run():
    ip_getter = get_wan_ip()
    while True:
        # noinspection PyBroadException
        try:
            wan_ip = next(ip_getter)
            domain_ip = get_domain_ip(domain_lst[0])
            if wan_ip and domain_ip and wan_ip != domain_ip:
                set_dns_ip(wan_ip)
        except Exception:
            pass
        finally:
            time.sleep(30)

备注:

代码是基于Python 3.8.10开发。自己用实现有些简陋,请自行完善。

Top