怎样用C#进行仪器的设备改造?因为仪器控制程序是C#开发的,所以客户端最好是c#,考虑到节省流量(服务器是按流量收费的),文件要压缩,C#下要实现文件压缩功能。而服务端选择java构建restful API进行上传的的方案。

一、项目需求及分析

  1. 按照领导的要求,要改造一台仪器,添加点功能,将测量数据上传到服务器。仪器测量节拍大概是20s,数据量目前不大,每次测量大概不到2M左右,且都是浮点数据和整形数据。

  2. 起初想用TCP长连接实现的,但考虑到现场环境。典型的制造业车间,电磁环境复杂,网络信号不稳,所以不考虑TCP长连接实现。短连接也不在考虑范围内,以后仪器数量多了之后频繁的建立连接开销也很大,服务器有可能受不了(阿里云的乞丐版)。所以选择用restful提交,http的通信可以多线程调度。

  3. 仪器控制程序是C#开发的,所以客户端最好是c#。服务端我想用springboot,很方便。

  4. 考虑到新增的上传功能不能影响之前的测量节拍,所以要多线程实现。可惜我又很懒,不想考虑线程协调问题,最后选择消息队列实现。

  5. 考虑到节省流量(服务器是按流量收费的),文件要压缩,C#下要实现文件压缩功能。

  6. 从测量文件中读取数据,将参数存入数据库,测量原始数据打包放在文件服务器上。

二、整体架构和技术方案

最后的技术方案就是C#做客户端,java构建服务端restful API进行上传

整体架构如下图:
a.jpg

使用的技术如下:

  1. C#的Restful客户端:RestSharp

  2. java的Restful服务端:springboot

  3. C#端消息队列:NetMQ

  4. C#端zip操作组件:DotNetZip

  5. java端zip操作组件:Apache Commons Compress

三、服务端

服务端采用springboot的restful,POST方式,非常简单。
传输文件采用MultipartFile方式,因为客户端的ResrSharp只能采用这种方式传递文件

@RestController@RequestMapping(value = "upload")public class FileRestController {    Logger logger = LogManager.getLogger(FileRestController.class);    @RequestMapping(value = "file", method = RequestMethod.POST)    public    @ResponseBody    RestResult getZipFile(@RequestParam("file") MultipartFile file) throws IOException, URISyntaxException {        RestResult result = new RestResult();        if (!file.getName().isEmpty()) {            InputStream stream = file.getInputStream();//            String directory = FileRestController.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath();            String directory = "/usr/local/haliang/files/";            try {                directory = URLDecoder.decode(directory, "utf-8");            } catch (java.io.UnsupportedEncodingException e) {                return null;            }            FileOutputStream fs = new FileOutputStream(directory + file.getOriginalFilename());            logger.info("文件所在的目录:   " + directory + "/files/" + file.getOriginalFilename());            byte[] buffer = new byte[1024 * 1024];            int bytesum = 0;            int byteread = 0;            while ((byteread = stream.read(buffer)) != -1) {                bytesum += byteread;                fs.write(buffer, 0, byteread);                fs.flush();            }            fs.close();            stream.close();            logger.info("成功接收文件:   " + directory + file.getOriginalFilename());        }        return result;    }}

四、客户端

客户端架构如下图:
b.jpg

1 zeromq简介

NetMQ 是 ZeroMQ的C#移植版本。

1.1:zeromq是什么

NetMQ (ZeroMQ to .Net),ZMQ号称史上最快中间件。
它对socket通信进行了封装,使得我们不需要写socket函数调用就能完成复杂的网络通信。
它跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。
它是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。和一般意义上的消息队列产品不同的是,它没有消息队列服务器,而更像是一个网络通信库。从网络通信的角度看,它处于会话层之上,应用层之下,属于传输层。

1.2:zeromq的消息模型

zeromq将消息通信分为4种模型,分别是一对一结对模型(Exclusive-Pair)、请求回应模型(Request-Reply)、发布订阅模型(Publish-Subscribe)、推拉模型(Push-Pull)。这4种模型总结出了通用的网络通信模型,在实际中可以根据应用需要,组合其中的2种或多种模型来形成自己的解决方案。

1.2.1 一对一结对模型 Exclusive-Pair

最简单的1:1消息通信模型,用来支持传统的 TCP socket模型,主要用于进程内部线程间通信。可以认为是一个TCP Connection,但是TCP Server只能接受一个连接。采用了lock free实现,速度很快。数据可以双向流动,这点不同于后面的请求响应模型。(不推荐使用,没有例子)

1.2.2 请求回应模型 Request-Reply

由请求端发起请求,然后等待回应端应答。一个请求必须对应一个回应,从请求端的角度来看是发-收配对,从回应端的角度是收-发对。跟一对一结对模型的区别在于请求端可以是1~N个。
请求端和回应端都可以是1:N的模型。通常把1认为是server,N认为是Client。ZeroMQ可以很好的支持路由功能(实现路由功能的组件叫作Device),把1:N扩展为N:M(只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含有回应地址,而应用则不关心它。通常把该模型主要用于远程调用及任务分配等。
(NetMQ请求响应C#调用案例)

1.2.3 发布订阅模型 Publisher-Subscriber(本项目采用的模型)

发布端单向分发数据,且不关心是否把全部信息发送给订阅端。如果发布端开始发布信息时,订阅端尚未连接上来,则这些信息会被直接丢弃。订阅端未连接导致信息丢失的问题,可以通过与请求回应模型组合来解决。订阅端只负责接收,而不能反馈,且在订阅端消费速度慢于发布端的情况下,会在订阅端堆积数据。该模型主要用于数据分发。天气预报、微博明星粉丝可以应用这种经典模型。 (NetMQ发布订阅模式C#调用案例)

1.2.4 推拉模型 Push-Pull

Server端作为Push端,而Client端作为Pull端,如果有多个Client端同时连接到Server端,则Server端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到Client端上。与发布订阅模型相比,推拉模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。
(NetMQ推拉模式C#调用案例)

1.3:zeromq的优势

  1. TCP:ZeroMQ基于消息,消息模式,而非字节流。

  2. XMPP:ZeroMQ更简单、快速、更底层。Jabber可建在ZeroMQ之上。

  3. AMQP:完成相同的工作,ZeroMQ要快100倍,而且不需要代理(规范更简洁——少278页)

  4. IPC:ZeroMQ可以跨多个主机盒,而非单台机器。

  5. CORBA:ZeroMQ不会将复杂到恐怖的消息格式强加于你。

  6. RPC:ZeroMQ完全是异步的,你可以随时增加/删除参与者。

  7. RFC 1149:ZeroMQ比它快多了!

  8. 29west LBM:ZeroMQ是自由软件!

  9. IBM低延迟:ZeroMQ是自由软件!

  10. Tibco:仍然是自由软件!

2.代码实现

2.1 Publisher(发布者)

一般都是发布者先启动,绑定监听端口。封装了一个发送函数,主要是发送原先软件生成测量文件的路径。

public class Publisher    {        public int Port { get; set; }        private PublisherSocket socket;        /// <summary>        /// 构造函数        /// </summary>        /// <param name="port">绑定的端口</param>        public Publisher(int port)        {            Port = port;        }        /// <summary>        /// 启动发布端        /// </summary>        public void Start()        {            NetMQContext context = NetMQContext.Create();            this.socket = context.CreatePublisherSocket();            this.socket.Bind("tcp://127.0.0.1:" + Port);        }        /// <summary>        /// 发送数据        /// </summary>        /// <param name="result"></param>        public void Send(string result)        {            socket.SendFrame(result);        }    }

2.2 Subscriber(订阅者)

订阅者启动时候连接端口。防止线程阻塞,订阅者是新开一个线程运行的。

public class Subscribe    {        private delegate void GetDataHandler(string message);        private event GetDataHandler onGetData;        public int Port { get; set; }        public string TempDirectory { get; set; }        public bool isRunning { get; set; }        public string domain { get; set; }        public Subscribe(int port, string domain)        {            Port = port;            this.domain = domain;            onGetData += ProcessData;        }        private SubscriberSocket socket;        public void Start()        {            this.isRunning = true;            NetMQContext context = NetMQContext.Create();            socket = context.CreateSubscriberSocket();            socket.Connect("tcp://127.0.0.1:" + Port);            socket.Subscribe("");            Thread t = new Thread(new ThreadStart(StartSub));            t.Start();        }        private void StartSub()        {            while (isRunning)            {                Thread.Sleep(10000);                string result = socket.ReceiveFrameString(Encoding.UTF8);                onGetData(result);            }        }        private void ProcessData(string path)        {            Console.WriteLine("收到文件:" + path);            string compressedFile = Compress.CompressFile(TempDirectory, path);            new RestPost(domain).Post(compressedFile);        }

3 客户端压缩

压缩使用DotNetZip组件,非常简单好用。

 public class Compress    {        public static string CompressFile(string temp,string txtPath)        {            string txtFileName = System.IO.Path.GetFileNameWithoutExtension(txtPath);            string compressedFileName = temp+"/"+txtFileName + ".zip";            ZipFile file=new ZipFile();            file.AddFile(txtPath,"");            file.Save(compressedFileName);            return compressedFileName;        }    }

4 客户端上传

使用RestSharp组件,也是非常简单。异步回调,不影响性能。

public class RestPost    {        public string Domain { get; set; }        public RestPost(string domain)        {            Domain = domain;        }        public void Post(string path)        {            RestRequest request = new RestRequest(Method.POST);            request.AddFile("file", path);            RestClient client = new RestClient {BaseUrl = new Uri("http://" + Domain + "/upload/file")};            client.ExecuteAsync(request, (response) =>                {                    if (response.StatusCode == HttpStatusCode.OK)                    {                        Console.WriteLine("上传成功...\n" + response.Content);                    }                    else                    {                        Console.WriteLine($"出错啦:{response.Content}");                    }                }            );        }    }

五、总结

  1. 写代码之前一定要搞清楚需求,设计好架构

  2. 注意消息队列启动时候的线程问题

  3. 异步执行

相关文章:

企业数据备份技术

相关视频:

数据结构探险—队列篇

更多相关文章

  1. Go语言的内存模型介绍
  2. XML中的树形结构与DOM文档对象模型的示例代码(图)
  3. 芋道 Spring Boot 消息队列 RocketMQ 入门
  4. 面试官再问我如何保证 RocketMQ 不丢失消息,这回我笑了!
  5. RocketMQ 源码分析 —— 定时消息与消息重试
  6. 消息中间件 RocketMQ 源码解析 —— 调试环境搭建
  7. Redis 哈希结构内存模型剖析
  8. 分布式消息队列 RocketMQ源码解析:事务消息
  9. 分布式消息队列 RocketMQ源码解析:Filtersrv

随机推荐

  1. 啊这
  2. 怎样编写更好的 JavaScript 代码[每日前
  3. Maven设置utf8编码格式
  4. 11个顶级 JavaScript 日历插件[每日前端
  5. Centos给文件设置了777权限仍不能访问解
  6. 如何开发跨框架组件?[每日前端夜话0xA6]
  7. 前端与后端开发中技术差异的全面对比[每
  8. k8s-kubernetes 入门
  9. 用 ref 访问 Vue.js 程序中的 DOM[每日前
  10. Android(安卓)在一个程序中启动另一个程