netpoll原理

  1. 学习
    1. netpoll init阶段
    2. setup阶段
    3. 发包
  • 后记
  • 参考
  • 学习

    收发数据包有几种方式:中断,就是收发包由网卡发送中断给cpu通知处理;poll,直接poll轮询发包收包,不需要中断通知;napi,相当于中断和poll混合方式,就是高速网络中不可能说来一个包产生一次中断,这对cpu来说负荷太大了,所以可以这样搞,当数据包来的时候,产生中断,然后cpu收到中断后就去poll一段时间去处理所有过来的数据包。

    为什么要有netpoll呢?为什么要有纯poll的方式?在一些场景下比较有用,比如panic的时候,这时候中断可能就失效了,采用中断的工作方式就行不通了,但是poll依旧可以;还有种场景,就是协议栈出问题了,那中断或者napi的方式可能就不好用了,这时候也可以使用netpoll的方式来debug之类的

    一般场景中,网卡收发包之后到内核缓冲区,然后要经过内核协议栈的处理,而netpoll其实可以不用经过这些协议栈,直接调用驱动发包之类的(所以这就是之前在搞netconsole的时候,tcpdump无法在发送端抓包的原因,因为netpoll发包直接给驱动了绕过了协议栈)。所以,至此可以明白netpoll的使用场景了,大概就是内核网络协议栈崩了或者系统panic这种紧急情况下能够继续转发包

    netpoll init阶段
    //在core_init阶段就初始化的
    core_initcall(netpoll_init);
    
    netpoll_init
    //这里这个skb_poll是一个static的sk_buff_head结构体
    //所以应该相当于netpoll存储skb的内核缓冲区吧
    //这个其实就是单纯的把sk_buff_head结构体初始化一下
    skb_queue_head_init(&skb_pool);
    

    这里其实init就完成了,其实就是把sk_buff_head这个结构体初始化了一下

    setup阶段

    以netconsole为例,在调netpoll转发数据包之前,得需要先构造一个netpoll结构体然后去调用netpoll_setup函数去setup这个结构体

    //只留主要流程看一下
    int netpoll_setup(struct netpoll *np)
    {
    //这里通过传的netpoll中的dev_name参数来获取对应的net_device结构体
    //所以为什么说如果网卡还没初始化好不能加载netconsole模块呢,因为那时候通过dev_get_by_name获取不到net_device结构体
    	if (np->dev_name[0]) {
    		struct net *net = current->nsproxy->net_ns;
    		ndev = __dev_get_by_name(net, np->dev_name);
    	}
    //然后hold一下,相当于加一下引用计数
    //防止在使用设备期间释放设备
    //这块需要注意,想起之前写dump虚机vring信息时候,就没注意这块,不能rmmod了
    //使用设备的时候,比如net_device、gendisk,就需要增加引用计数,对应的,用完之后要put减掉引用计数
    	dev_hold(ndev);
    
    
    	if (!np->local_ip.ip) {
      //这块就是如果传来的np里没有指定ip的话,就获取ip
    	}
    
    	/* fill up the skb queue */
      //这里是refill一下skb,其实就是把skb_pool上挂一些空的skb(通过alloc_skb)到上限先
      //后面的发数据包直接就找空的skb填充了
    	refill_skbs();
    
      //然后调用这个函数
    	err = __netpoll_setup(np, ndev);
      
    put:
    	dev_put(ndev);
    unlock:
    	rtnl_unlock();
    	return err;
    }
    
    
    int __netpoll_setup(struct netpoll *np, struct net_device *ndev)
    {
    //填充netpoll结构体
    	np->dev = ndev;
    	strscpy(np->dev_name, ndev->name, IFNAMSIZ);
    
    //如果网卡上没有npinfo(netpoll_info结构体),就构造一个并初始化
    	if (!ndev->npinfo) {
    		npinfo = kmalloc(sizeof(*npinfo), GFP_KERNEL);
    		if (!npinfo) {
    			err = -ENOMEM;
    			goto out;
    		}
    
    		sema_init(&npinfo->dev_lock, 1);
    		skb_queue_head_init(&npinfo->txq);
    		INIT_DELAYED_WORK(&npinfo->tx_work, queue_process);
    
    		refcount_set(&npinfo->refcnt, 1);
        //这块这个ops的赋值赋的网卡的netdev_ops
        //这里面有一堆回调的callback,应该就是对网卡层面的操作
    		ops = np->dev->netdev_ops;
    		if (ops->ndo_netpoll_setup) {
    			err = ops->ndo_netpoll_setup(ndev, npinfo);
    			if (err)
    				goto free_npinfo;
    		}
    	} else {
    		npinfo = rtnl_dereference(ndev->npinfo);
    		refcount_inc(&npinfo->refcnt);
    	}
    
    //把网卡netpoll赋值成当前传进来的netpoll指针,也就是把之前构造的netpoll结构体和npinfo关联上了
    //这块可以看出来,一个网卡上只能挂一个netpoll结构体?出于什么原因呢?为什么这么设计?
    	npinfo->netpoll = np;
    
      //然后把npinfo和网卡关联上,其实就是把netpoll和网卡关联上了
    	/* last thing to do is link it to the net device structure */
    	rcu_assign_pointer(ndev->npinfo, npinfo);
    
    	return 0;
    
    free_npinfo:
    	kfree(npinfo);
    out:
    	return err;
    }
    

    这里就完事了,主要做的是什么呢?初始化netpoll、npinfo结构体然后把他们和指定的网卡关联

    发包

    如netconsole中提到的,当printk向netconsole发消息时,其实就是调用netconsole的write_msg函数。而write_msg函数其实就是调用netpoll_send_udp去发包

    //这里就是发udp包了,还是看一下主要的流程
    void netpoll_send_udp(struct netpoll *np, const char *msg, int len)
    {
    	int total_len, ip_len, udp_len;
    	struct sk_buff *skb;
    	struct udphdr *udph;
    	struct iphdr *iph;
    	struct ethhdr *eth;
    	static atomic_t ip_ident;
    	struct ipv6hdr *ip6h;
    //这里把传来的msg的长度加上udphdr结构体的长度
    //udphdr结构体在收发包过程中很重要,就是记录源 目的端口号 报文长度之类的,如下
    //struct udphdr {
    //	__be16	source;
    //	__be16	dest;
    //	__be16	len;
    //	__sum16	check;
    //};
    	udp_len = len + sizeof(*udph);
    
    //这里又根据ipv4或ipv6进行区分,加上对应的ip头
    //后面把ipv6的处理删掉了,只看ipv4吧,处理过程差不多的应该
    	if (np->ipv6)
    		ip_len = udp_len + sizeof(*ip6h);
    	else
    		ip_len = udp_len + sizeof(*iph);
    
    //所以总长度就是msg + udph + iphdr,再加上一些预留空间
    	total_len = ip_len + LL_RESERVED_SPACE(np->dev);
    
    //从netpoll中找到一个可用的skb
    //其实就是在init阶段初始化的并在setup阶段refille的skb_pool中找一个可用的skb
    	skb = find_skb(np, total_len + np->dev->needed_tailroom,
    		       total_len - len);
    //把msg拷贝到skb中去
    	skb_copy_to_linear_data(skb, msg, len);
    //在skb的tail增加东西
    //主要就是增加tail指针和len
    //这里其实就是相当于把上一步中copy的msg现在加上对应的长度?
    	skb_put(skb, len);
    //这里是在skb的head增加东西
    //加的是udph的长度
    	skb_push(skb, sizeof(*udph));
    //然后重置skb->transport_header
    //因为这时候skb->data已经被修改了
    	skb_reset_transport_header(skb);
    
    //从skb中获取udph头
    //然后填充
    	udph = udp_hdr(skb);
    	udph->source = htons(np->local_port);
    	udph->dest = htons(np->remote_port);
    	udph->len = htons(udp_len);
    
    	if (np->ipv6) {
    	} else {
    		udph->check = 0;
        //计算checksum
    		udph->check = csum_tcpudp_magic(np->local_ip.ip,
    						np->remote_ip.ip,
    						udp_len, IPPROTO_UDP,
    						csum_partial(udph, udp_len, 0));
    		if (udph->check == 0)
    			udph->check = CSUM_MANGLED_0;
        //在skb头插入iph长度
        //重置skb,然后获取skb的iph头
    		skb_push(skb, sizeof(*iph));
    		skb_reset_network_header(skb);
    		iph = ip_hdr(skb);
    
    		/* iph->version = 4; iph->ihl = 5; */
        //填充各个字段
    		*(unsigned char *)iph = 0x45;
    		iph->tos      = 0;
    		put_unaligned(htons(ip_len), &(iph->tot_len));
    		iph->id       = htons(atomic_inc_return(&ip_ident));
    		iph->frag_off = 0;
    		iph->ttl      = 64;
    		iph->protocol = IPPROTO_UDP;
    		iph->check    = 0;
    		put_unaligned(np->local_ip.ip, &(iph->saddr));
    		put_unaligned(np->remote_ip.ip, &(iph->daddr));
    		iph->check    = ip_fast_csum((unsigned char *)iph, iph->ihl);
    
        //在skb中插入mac地址的长度
    		eth = skb_push(skb, ETH_HLEN);
    		skb_reset_mac_header(skb);
    		skb->protocol = eth->h_proto = htons(ETH_P_IP);
    	}
    
    //把源mac和目的mac地址复制到eth中
    	ether_addr_copy(eth->h_source, np->dev->dev_addr);
    	ether_addr_copy(eth->h_dest, np->remote_mac);
    
    //指定网卡设备
    	skb->dev = np->dev;
    
    //发送skb
    	netpoll_send_skb(np, skb);
    }
    
    
    netpoll_send_skb
    //这里会关中断
    __netpoll_send_skb
    
    static netdev_tx_t __netpoll_send_skb(struct netpoll *np, struct sk_buff *skb)
    {
    	netdev_tx_t status = NETDEV_TX_BUSY;
    	struct net_device *dev;
    	unsigned long tries;
    	/* It is up to the caller to keep npinfo alive. */
    	struct netpoll_info *npinfo;
    
    //确保中断被禁用
    	lockdep_assert_irqs_disabled();
    
    	dev = np->dev;
    	npinfo = rcu_dereference_bh(dev->npinfo);
    
    	if (!npinfo || !netif_running(dev) || !netif_device_present(dev)) {
    		dev_kfree_skb_irq(skb);
    		return NET_XMIT_DROP;
    	}
    
    	/* don't get messages out of order, and no recursion */
    //检查发送队列的长度是否为0,,为什么需要为0?因为需要按照顺序发送,如果发送队列中还有内容的话就应该先把发送队列中的内容发送出去
    //检查当前的执行上下文是否已经在处理特定网络设备的netpoll操作,防止嵌套调用netpoll
    	if (skb_queue_len(&npinfo->txq) == 0 && !netpoll_owner_active(dev)) {
    		struct netdev_queue *txq;
        //选择一个合适的发送队列
    		txq = netdev_core_pick_tx(dev, skb, NULL);
    
    		/* try until next clock tick */
    		for (tries = jiffies_to_usecs(1)/USEC_PER_POLL;
    		     tries > 0; --tries) {
    			if (HARD_TX_TRYLOCK(dev, txq)) {
            //只要传输队列不停止,就调用netpoll_start_xmit发送包
    				if (!netif_xmit_stopped(txq))
    					status = netpoll_start_xmit(skb, dev, txq);
    
    				HARD_TX_UNLOCK(dev, txq);
            //如果发包成功,就跳出
    				if (dev_xmit_complete(status))
    					break;
    
    			}
          //tick网卡可能有一些其他的cleanup等事情需要处理?
    			/* tickle device maybe there is some cleanup */
    			netpoll_poll_dev(np->dev);
    
    			udelay(USEC_PER_POLL);
    		}
    
    		WARN_ONCE(!irqs_disabled(),
    			"netpoll_send_skb_on_dev(): %s enabled interrupts in poll (%pS)\n",
    			dev->name, dev->netdev_ops->ndo_start_xmit);
    
    	}
    
      //如果没有发包完成,就把包入队到npinfo->txq
      //然后调把tx_work入队到delay workqueue中,等稍后继续发送
    	if (!dev_xmit_complete(status)) {
    		skb_queue_tail(&npinfo->txq, skb);
    		schedule_delayed_work(&npinfo->tx_work,0);
    	}
    	return NETDEV_TX_OK;
    }
    
    
    netpoll_start_xmit
    netdev_start_xmit
    //这里就是调驱动中对应的发送函数了ops->ndo_start_xmit(skb, dev);直接通过网卡发送了
    __netdev_start_xmit
    

    所以,发包的简要过程就是:填充发送的内容->填充相关协议头,比如ip头 mac头之类的->关中断->调用网卡驱动的发送函数发包->开中断

    后记

    • netpoll_owner_active是怎么检查的?

    遍历napi列表中的每一个napi结构体,如果有napi结构体中的poll_owner正好是当前的cpu,说明此时的cpu正在进行poll工作

    static int netpoll_owner_active(struct net_device *dev)
    {
    	struct napi_struct *napi;
    
    	list_for_each_entry_rcu(napi, &dev->napi_list, dev_list) {
    		if (napi->poll_owner == smp_processor_id())
    			return 1;
    	}
    	return 0;
    }
    

    关于napi这块,后续还需要再深入了解

    • 为什么说netpoll绕过了协议栈?

    因为netpoll没有走netif_receive_skb到内核协议栈,而是直接封装好包之后通过网卡驱动把包发送了出去

    参考

    https://blog.csdn.net/dog250/article/details/45788497


    转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 857879363@qq.com