LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

C# 消息队列(消息队列的编程实现)

admin
2025年4月24日 0:36 本文热度 36

本章内容:
● 消息队列概述
● 消息队列体系结构
● 消息队列管理工具
● 编程实现消息队列
● 课程订单示例应用程序
● 消息队列和WCF

消息队列的编程实现

既然理解了消息队列的体系结构之后,就可以探讨其编程了。下面几节将学习如何创建和控制 队列,如何发送和接收消息。

还要构建一个小型课程订单应用程序,它由发送部分和接收部分组成。

创建消息队列

创建消息队列 前面了解了如何使用Computer Management 实用程序创建消息队列。消息队列还可以用 MessageQueue 类的Create()方法以编程方式创建。
在 Create()方法中,必须传递新队列的路径。路径包括队列所在主机的名称和队列的名称。在下 面的例子中,要在本地主机上创建MyNewPublicQueue 队列。为了创建私有队列,路径名必须包含 Private\MyNewPrivateQueue。

调用 Create()方法之后,就可以修改队列的属性。例如,使用Label 属性,把队列的标签设置为 Demo Queue。示例程序把队列的路径和格式名写到控制台上。格式名用UUID 自动创建,UUID 可 用于访问队列,且无须服务器名:

using System;
using System.Messaging;
namespace Wrox.ProCSharp.Messaging
{
class Program
{
    static void Main()
    {
        using (var queue = MessageQueue.Create(@".\MyNewPublicQueue"))
        {
            queue.Label = "Demo Queue";
            Console.WriteLine("Queue created:");
            Console.WriteLine("Path: {0}", queue.Path);
            Console.WriteLine("FormatName: {0}", queue.FormatName);
        }
    }
  }
}

创建队列时需要管理权限。通常不希望应用程序的用户拥有管理权限。这就是队 列通常用安装程序创建原因.

查找队列

路径名和格式名可以用于标识队列。要查找队列,必须区分公共队列和私有队列。公共队列在 Active Directory 中发布。对于这些队列,无须知道它们所在的系统。只有在己知队列所在系统私有 队列名称时才能找到私有队列。
在 Active Directory 域中搜索队列的标签、类别或格式名,就可以找到公共队列。还可以获得计算 机上的所有队列。MessageQueue 类的静态方法GetPublicQueuesByLabel()、GetPublicQueuesByCategory() 和GetPublicQueuesByMachine()可以搜索队列。GetPublicQueues()方法返回包含域中所有公共队列的 数组。

using System;
using System.Messaging;
namespace Wrox.ProCSharp.Messaging
{
    class Program
    {
      static void Main()
      {
          foreach (var queue in MessageQueue.GetPublicQueues())
          {
              Console.WriteLine(queue.Path);
          }
      }
    }
}

GetPublicQueues()方法是重载的。它的一个重载版本允许传递MessageQueueCriteria 类的一个实 例。利用这个类可以搜索在某个时刻之前或之后创建或修改的队列,还可以查找队列的类别、标签 或计算机名。

可以使用静态方法GetPrivateQueuesByMachine()搜索私有队列。这个方法返回指定系统中的所 有私有队列。

打开已知队列

如果队列名已知,就不需要搜索它。使用路径或格式名就可以打开队列。路径或格式名都在 MessageQueue 类的构造函数中设置。

路径名

路径指定了打开队列需要的计算机名和队列名。下面的代码示例打开本地主机上的 MyPublicQueue 队列。为了确定队列是否存在,可以使用静态方法MessageQueue.Exists():

using System;
using System.Messaging;
namespace Wrox.ProCSharp.Messaging
{
    class Program
    {
        static void Main()
        {
            if (MessageQueue.Exists(@".\mynewpublicqueue"))
            {
                var queue = new MessageQueue(@".\mynewpublicqueue");
                //...
            }
            else
            {
                Console.WriteLine("Queue .\MyPublicQueue not existing");
            }
        }
    }
}

根据队列的类型,在打开队列时需要不同的标识符。下表 列出了指定类型的队列名的语法。

image

在使用路径名打开公共队列时,需要传递计算机名。如果计算机名未知,则可以使用格式名代 替。私有队列的路径名只能在本地系统上使用,必须使用格式名远程访问私有队列。

格式名

除了路径名之外,还可以使用格式名打开队列。格式名用于在Active Directory 中搜索队列,获 得队列所在的主机。在断开连接的环境下,在发送消息时队列不能到达,此时就需要使用格式名:

MessageQueue queue = new MessageQueue(
@"FormatName:PUBLIC=09816AFF-3608-4c5d-B892-69754BA151FF");

格式名还有一些其他用途。它可以用于打开私有队列,并指定要使用的协议:
要 访 问 私有队列, 必须给构造函数传递字符串 FormatName:PRIVATE=MachineGUID\QueueNumber。在创建队列时,会生成私有队列的队 列号。队列号在\System32\msmq\storage\lqs 目录下。
使用 FormatName:DIRECT=Protocol:MachineAddress\QueueName,可以指定用于发送消息的 协议。Message Queuing 3.0 及以后版本支持HTTP 协议。

FormatName:DIRECT=OS:MachineName\QueueName 是使用格式名指定队列的另一种方式。 此时不需要指定协议,但仍可以使用计算机名和格式名。

发送消息

可以使用MessageQueue 类的Send()方法给队列发送消息。作为参数传递给Send()方法的对象序 列化到相关联的队列上。Send()方法是重载的,这样才能传递标签和MessageQueueTransaction 对象。 Message Queuing 的事务行为在后面论述。 下面的代码示例先检查队列是否存在,如果不存在,就创建一个队列。接着打开队列,使用Send() 方法给队列发送Sample Message 消息。 路径名给服务器名指定“.”,表示它是本地系统。私有队列的路径名只能在本地使用。

using System;
using System.Messaging;
namespace Wrox.ProCSharp.Messaging
{
    class Program
    {
        static void Main()
        {
            try
            {
                if (!MessageQueue.Exists(@".\Private$\MyPrivateQueue"))
                {
                    MessageQueue.Create(@".\Private$\MyPrivateQueue");
                }
                var queue = new MessageQueue(@".\Private$\MyPrivateQueue");
                queue.Send("Sample Message", "Label");
          }
          catch (MessageQueueException ex)
          {
              Console.WriteLine(ex.Message);
          }
      }
  }
}

消息格式化程序

消息传输给队列的格式取决于格式化程序。MessageQueue 类有一个Formatter 属性,通过它可 以指定格式化程序。默认的格式化程序XmlMessageFormatter 会用XML 语法格式化消息,如前面的 例子所示。 消息格式化程序实现IMessageFormatter 接口。System.Messaging 名称空间中有3 个消息格式化 程序: XmlMessageFormatter 是默认的格式化程序,它使用XML 序列化对象,XML 格式的内容详。

使用 BinaryMessageFormatter,可以用二进制格式对消息进行序列化。这些消息比使用XML 格式化的消息短。

ActiveXMessageFormatter 是一个二进制格式化程序,这样可以用COM 对象读写消息。使 用这个格式化程序,可以用.NET 类把消息写入队列中,使用COM 对象从队列中读取消息, 反之亦然。

发送复杂的消息

除了传递字符串之外,还可以给MessageQueue 类的Send()方法传递对象。虽然该类的类型必 须满足一些特定的要求,但它们取决于格式化程序。

对于二进制格式化程序,该类必须用[Serializable]属性序列化。使用.NET 运行库的序列化功能, 序列化所有字段(包括私有字段)。实现ISerializable 接口就可以定义自定义序列化。

XML 序列化在使用XML 格式化程序时进行。在XML 序列化过程中,会序列化所有公共字段 和属性。使用System.Xml.Serialization 名称空间中的属性可以影响XML 序列化。

接收消息

要读取消息,也可以使用MessageQueue 类。通过Receive()方法可以读取一条消息,再将该消 息从队列中删除。如果使用不同的优先级发送消息,就读取优先级最高的消息。读取优先级相同的 消息时,第一条发送的消息不一定是第一条读取的消息,因为消息在网络中的传递顺序无法保证。 要保证发送顺序和读取顺序相同,可以使用事务消息队列。

在下面的例子中,要从私有队列MyPrivateQueue 中读取一条消息。之前把一个简单的字符串传 递给该消息。在使用XmlMessageFormatter 格式化程序读取消息时,必须把要读取的对象的类型传 递给该格式化程序的构造函数。在本例中,将System.String 类型传递给XmlMessageFormatter 的构 造函数的参数数组。这个构造函数可以接收一个String 数组,该数组包含要作为字符串传递的类型; 也可以接收一个Type 数组。

用 Receive()方法读取消息,再把消息正文写入控制台中:

using System;
using System.Messaging;
namespace Wrox.ProCSharp.Messaging
{
    class Program
    {
        static void Main()
        {
            var queue = new MessageQueue(@".\Private$\MyPrivateQueue");
            queue.Formatter = new XmlMessageFormatter(
new string[] {"System.String"});
            Message message = queue.Receive();
            Console.WriteLine(message.Body);
      }
  }
}

Receive()方法将同步执行,如果队列中没有消息,它就会等待队列中有消息时再执行。

枚举消息

除了使用 Receive()方法逐条消息地读取之外,还可以使用枚举器遍历所有消息。因为 MessageQueue 类实现IEnumerable 接口,所以可以在foreach 语句中使用。使用迭代器时,虽然消息 不会从队列中删除,但可以查看消息从而获得它们的内容:

var queue = new MessageQueue(@".\Private$\MyPrivateQueue");
queue.Formatter = new XmlMessageFormatter(
new string[] {"System.String"});
foreach (Message message in queue)
{
    Console.WriteLine(message.Body);
}

除了使用IEnumerable 接口外,还可以使用MessageEnumerator 类。虽然MessageEnumerator 类实现 IEnumerator 接口,但它有更多功能。实现IEnumerable 接口,就表示不从队列中删除消息。 MessageEnumerator 类的RemoveCurrent()方法可以从枚举器的当前光标位置删除消息。

在下面的例子中,使用MessageQueue 类的GetMessageEnumerator()方法访问MessageEnumerator 类。通过MessageEnumerator 类的MoveNext()方法,可以逐条查看消息。MoveNext()方法重载为允 许把一个时间段作为参数。这是使用这个枚举器的一个主要优点。现在,线程可以在指定的时间段 内等待消息到达队列,之后就不等待了。IEnumerator 接口定义的Current 属性返回消息的一个引用:

var queue = new MessageQueue(@".\Private$\MyPrivateQueue");
queue.Formatter = new XmlMessageFormatter(
new string[] {"System.String"});
using (MessageEnumerator messages = queue.GetMessageEnumerator())
{
    while (messages.MoveNext(TimeSpan.FromMinutes(30)))
    {
        Message message = messages.Current;
        Console.WriteLine(message.Body);
    }
}

异步读取

MessageQueue 类的Receive()方法会等到队列中的消息可以读取为止。为了避免阻碍线程的执 行,可以在Receive()方法的一个重载版本中指定一个超时期限。要在超时后读取队列中的消息,必 须再次调用Receive()方法。除了轮询消息外,还可以调用BeginReceive()异步方法。在使用 BeginReceive()开始异步读取消息之前,应设置ReceiveCompleted 事件。ReceiveCompleted 事件需要 ReceiveCompletedEventHandler 委托,在消息到达队列并可以读取时该委托引用要调用的方法。在下 面的例子中,把MessageArrived()方法传递给ReceivedCompletedEventHandler 委托:

var queue = new MessageQueue(@".\Private$\MyPrivateQueue");
queue.Formatter = new XmlMessageFormatter(
new string[] { "System.String" });
queue.ReceiveCompleted += MessageArrived;
queue.BeginReceive();
// thread does not wait

MessageArrived()处理程序方法需要两个参数。第一个参数是MessageQueue 事件源。第二个参 数是ReceiveCompletedEventArgs 类型,它包含消息和异步结果。在下面的例子中,调用队列中的 EndReceive()方法,以获得异步方法的结果,即消息:

public static void MessageArrived(object source, ReceiveCompletedEventArgs e)
{
    MessageQueue queue = (MessageQueue)source;
    Message message = queue.EndReceive(e.AsyncResult);
    Console.WriteLine(message.Body);
}


阅读原文:原文链接


该文章在 2025/4/24 10:02:21 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2025 ClickSun All Rights Reserved