经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Rust » 查看文章
17. 从零开始编写一个类nginx工具, Rust中一些功能的实现
来源:cnblogs  作者:问蒙服务框架  时间:2023/10/25 10:13:04  对本文有异议

wmproxy

wmproxy将用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,后续将实现websocket代理, 内外网穿透等, 会将实现过程分享出来, 感兴趣的可以一起造个轮子法

项目地址

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

日志功能

为了更容易理解程序中发生的情况,我们可能想要添加一些日志语句。通常在编写应用程序时这很容易。「在某种程度上,日志记录与使用 println! 相同,只是你可以指定消息的重要性」。
在rust中定义的日志级别有5种分别为errorwarninfodebugtrace
定义日志的级别是表示只关系这级别的日志及更高级别的日志:

定义log,则包含所有的级别
定义warn,则只会显示error或者warn的消息

要向应用程序添加日志记录,你需要两样东西:

  1. log crate,rust官方指定的日志级别库
  2. 一个实际将日志输出写到有用位置的适配器

当下我们选用的是流行的根据环境变量指定的适配器env_logger,它会根据环境变量中配置的值,日志等级,或者只开启指定的库等功能,或者不同的库分配不同的等级等。

Linux或者MacOs上开启功能

  1. env RUST_LOG=debug cargo run

Windows PowerShell上开启功能

  1. $env:RUST_LOG="debug"
  2. cargo run

Windows CMD上开启功能

  1. set RUST_LOG="debug"
  2. cargo run

如果我们指定库等级可以设置

  1. RUST_LOG="info,wenmeng=warn,webparse=warn"

这样就可以减少第三方库打日志给程序带来的干扰

需要在Cargo.toml中引用

  1. [dependencies]
  2. log = "0.4.20"
  3. env_logger = "0.10.0"

以下是示意代码

  1. use log::{info, warn};
  2. fn main() {
  3. env_logger::init();
  4. info!("欢迎使用软件wmproxy");
  5. warn!("现在已经成功启动");
  6. }

println!将会直接输出到stdout,当日志数据多的时候,无法进行关闭,做为第三方库,就不能干扰引用库的正常看日志,所以这只能调试的时候使用,或者少量的关键地方使用。

多个TcpListener的Accept

因为当前支持多个端口绑定,或者配置没有配置,存在None的情况,我们需要同时在一个线程中await所有的TcpListener。
在这里我们先用的是tokio::select!对多个TcpListener同时进行await。
如果此时我们没有绑定proxy的绑定地址,此时listener为None,但我们需要进行判断才知道他是否为None,如果我们用以下写法:

  1. use tokio::net::TcpListener;
  2. use std::io;
  3. #[tokio::main]
  4. async fn main() -> io::Result<()> {
  5. let mut listener: Option<TcpListener> = None;
  6. tokio::select! {
  7. // 加了if条件判断是否有值
  8. Ok((conn, addr)) = listener.as_mut().unwrap().accept(), if listener.is_some() => {
  9. println!("accept addr = {:?}", addr);
  10. }
  11. }
  12. Ok(())
  13. }

此时我们试运行,依然报以下错误:

  1. thread 'main' panicked at 'called `Option::unwrap()` on a `None` value', examples/udp.rs:9:46

也就是即使加了if条件我们也正确的执行我们的操作,因为tokio::select的每个分支必须返回Fut,此时如果为None,就不能返回Fut违反了该函数的定义,那么我们做以下封装:

  1. async fn tcp_listen_work(listen: &Option<TcpListener>) -> Option<(TcpStream, SocketAddr)> {
  2. if listen.is_some() {
  3. match listen.as_ref().unwrap().accept().await {
  4. Ok((tcp, addr)) => Some((tcp, addr)),
  5. Err(_e) => None,
  6. }
  7. } else {
  8. // 如果为None的时候,就永远返回Poll::Pending
  9. let pend = std::future::pending();
  10. let () = pend.await;
  11. None
  12. }
  13. }

如果为None的话,将其返回Poll::Pending,则该分支await的时候永远不会等到结果。
那么最终的的代码示意如下:

  1. #[tokio::main]
  2. async fn main() -> io::Result<()> {
  3. let listener: Option<TcpListener> = TcpListener::bind("127.0.0.1:8090").await.ok();
  4. tokio::select! {
  5. Some((conn, addr)) = tcp_listen_work(&listener) => {
  6. println!("accept addr = {:?}", addr);
  7. }
  8. }
  9. Ok(())
  10. }

另一种在反向代理的时候因为server的数量是不定的,所以监听的TcpListener也是不定的,此时我们用Vec<TcpListener>来做表示,那么此时,我们如何通过tokio::select来一次性await所有的accept呢?
此时我们借助futures库中的select_all来监听,但是select_all又不允许空的Vec,因为他要返回一个Fut,空的无法返回一个Fut,所以此时我们也要对其进行封装:

  1. async fn multi_tcp_listen_work(listens: &mut Vec<TcpListener>) -> (io::Result<(TcpStream, SocketAddr)>, usize) {
  2. if !listens.is_empty() {
  3. let (conn, index, _) = select_all(listens.iter_mut()
  4. .map(|listener| listener.accept().boxed())).await;
  5. (conn, index)
  6. } else {
  7. let pend = std::future::pending();
  8. let () = pend.await;
  9. unreachable!()
  10. }
  11. }

此时监听从8091-8099,我们的最终代码:

  1. #[tokio::main]
  2. async fn main() -> io::Result<()> {
  3. let listener: Option<TcpListener> = TcpListener::bind("127.0.0.1:8090").await.ok();
  4. let mut listeners = vec![];
  5. for i in 8091..8099 {
  6. listeners.push(TcpListener::bind(format!("127.0.0.1:{}", i)).await?);
  7. }
  8. tokio::select! {
  9. Some((conn, addr)) = tcp_listen_work(&listener) => {
  10. println!("accept addr = {:?}", addr);
  11. }
  12. (result, index) = multi_tcp_listen_work(&mut listeners) => {
  13. println!("index receiver = {:?}", index)
  14. }
  15. }
  16. Ok(())
  17. }

如果此时我们用

  1. telnet 127.0.0.1 8098

那么我们就可以看到输出:

  1. index receiver = 7

表示代码已正确的执行。

Rust中数据在多个线程中的共享

Rust中每个对象的所有权都仅只能有一个对象拥有,那么我们数据在在多个地方共享的时候可以怎么办呢?
在单线程中,我们可以用use std::rc::Rc;

Rc的特点

  1. 单线程的引用计数
  2. 不可变引用
  3. 非线程安全,即仅能在单线程中使用
    Rc引用计数中还有一个弱引用称为Weak,弱引用表示持有对象的一个指针,但是不添加引用计数,也不会影响数据删除,不保证一定能取得到数据。
    因为其不能修改数据,所以也常用RefCell做配合,来做引用计数的修改。
    以下是一个父类子类用弱引用计数实现的方案:
  1. use std::rc::Rc;
  2. use std::rc::Weak;
  3. use std::cell::RefCell;
  4. /// 父类拥有者
  5. struct Owner {
  6. name: String,
  7. gadgets: RefCell<Vec<Weak<Gadget>>>,
  8. }
  9. /// 子类对象
  10. struct Gadget {
  11. id: i32,
  12. owner: Rc<Owner>,
  13. }
  14. fn main() {
  15. let gadget_owner: Rc<Owner> = Rc::new(
  16. Owner {
  17. name: "wmproxy".to_string(),
  18. gadgets: RefCell::new(vec![]),
  19. }
  20. );
  21. // 生成两个小工具
  22. let gadget1 = Rc::new(
  23. Gadget {
  24. id: 1,
  25. owner: Rc::clone(&gadget_owner),
  26. }
  27. );
  28. let gadget2 = Rc::new(
  29. Gadget {
  30. id: 2,
  31. owner: Rc::clone(&gadget_owner),
  32. }
  33. );
  34. {
  35. let mut gadgets = gadget_owner.gadgets.borrow_mut();
  36. gadgets.push(Rc::downgrade(&gadget1));
  37. gadgets.push(Rc::downgrade(&gadget2));
  38. }
  39. for gadget_weak in gadget_owner.gadgets.borrow().iter() {
  40. let gadget = gadget_weak.upgrade().unwrap();
  41. println!("小工具 {} 的拥有者:{}", gadget.id, gadget.owner.name);
  42. }
  43. }

因为其并未实现Send函数,所以无法在多线程种传递。在多线程中,我们需要用Arc,但是在Arc获取可变对象的时候有限制,必须他是唯一引用的时候才能修改。

  1. use std::sync::Arc;
  2. fn main() {
  3. let mut x = Arc::new(3);
  4. *Arc::get_mut(&mut x).unwrap() = 4;
  5. assert_eq!(*x, 4);
  6. let _y = Arc::clone(&x);
  7. assert!(Arc::get_mut(&mut x).is_none());
  8. }

所以我们在多线程中的引用需要修改的时候,通常会用Atomic或者Mutex来做数据的写入的唯一性。

  1. #![allow(unused)]
  2. fn main() {
  3. use std::sync::{Arc, Mutex};
  4. use std::thread;
  5. use std::sync::mpsc::channel;
  6. const N: usize = 10;
  7. let data = Arc::new(Mutex::new(0));
  8. let (tx, rx) = channel();
  9. for _ in 0..N {
  10. let (data, tx) = (Arc::clone(&data), tx.clone());
  11. thread::spawn(move || {
  12. // 共享数据data,保证在线程中只会同时有一个对象拥有修改权限,也相当于拥有所有权,10个线程,每个线程+1,最终结果必须等于10
  13. let mut data = data.lock().unwrap();
  14. *data += 1;
  15. if *data == N {
  16. tx.send(()).unwrap();
  17. }
  18. });
  19. }
  20. rx.recv().unwrap();
  21. assert!(*data.lock().unwrap() == 10);
  22. }

结语

以上是三种编写Rust中常碰见的情况,也是在此项目中应用解决过的方案,在了解原理的情况下,解决问题可以有不同的思路。理解了原理,你就知道他设计的初衷,更好的帮助你学习相关的Rust知识。

原文链接:https://www.cnblogs.com/wmproxy/p/wmproxy17.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号