Handle all Events for an aggregate

2020-04-14 06:55发布

Please see my first Persistent Subscription below:

 namespace PersistentSubscription
    {
        internal class Program
        {
            private static void Main()
            {
                var subscription = new PersistentSubscriptionClient();
                subscription.Start();
            }
        }

        public class PersistentSubscriptionClient
        {
            private IEventStoreConnection _conn;
            private const string STREAM = "$ce-customer";
            private const string GROUP = "a_test_group";
            private const int DEFAULTPORT = 1113;
            private static readonly UserCredentials User = new UserCredentials("admin", "changeit");
            private EventStorePersistentSubscriptionBase _subscription;

            public void Start()
            {
                var settings = ConnectionSettings.Create(); 

                using (_conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
                {
                    _conn.ConnectAsync().Wait();

                    CreateSubscription(); 
                    ConnectToSubscription();

                    Console.WriteLine("waiting for events. press enter to exit");
                    Console.ReadLine();
                }
            }

            private void ConnectToSubscription()
            {
                var bufferSize = 10;
                var autoAck = true;

                Action<EventStorePersistentSubscriptionBase, ResolvedEvent> eventAppeared = EventAppeared; 
                _subscription = _conn.ConnectToPersistentSubscription(STREAM, GROUP, eventAppeared, SubscriptionDropped, User, bufferSize, autoAck);
            }

            private void SubscriptionDropped(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
                SubscriptionDropReason subscriptionDropReason, Exception ex)
            {
                ConnectToSubscription();
            }

            private static void EventAppeared(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
                ResolvedEvent resolvedEvent)
            {
                MemoryStream stream = new MemoryStream(resolvedEvent.Event.Data);
                IFormatter formatter = new BinaryFormatter();
                stream.Seek(0, SeekOrigin.Begin);
                try
                {
                    CustomerCreated customerCreated = (CustomerCreated)formatter.Deserialize(stream); 
                    Console.WriteLine(customerCreated);
                }
                catch (Exception e)
                {
                    var test = "test";
                }

            }

            private void CreateSubscription()
            {
                PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                    .DoNotResolveLinkTos()
                    .StartFromCurrent();

                try
                {
                    _conn.CreatePersistentSubscriptionAsync(STREAM, GROUP, settings, User).Wait();
                }
                catch (AggregateException ex)
                {
                    if (ex.InnerException.GetType() != typeof(InvalidOperationException)
                        && ex.InnerException?.Message != $"Subscription group {GROUP} on stream {STREAM} already exists")
                    {
                        throw;
                    }
                }
            }
        }
    }

and my first client below:

using System;
using System.IO;
using System.Net;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text;
using EventStore.ClientAPI;

namespace WritingEvents
{
    class Program
    {
        static void Main(string[] args)
        {
            const int DEFAULTPORT = 1113;
            var settings = ConnectionSettings.Create();
            using (var conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
            {
                conn.ConnectAsync().Wait();
                CustomerCreated c1 = new CustomerCreated { Id = Guid.NewGuid(), Name = "Maria" };
                EventData customerCreated1 = GetEventDataFor(c1);
                conn.AppendToStreamAsync("customer-100", ExpectedVersion.Any, customerCreated1).Wait();
            }
        }

        private static EventData GetEventDataFor(CustomerCreated customerCreated)
        {
            IFormatter formatter = new BinaryFormatter();
            MemoryStream stream = new MemoryStream();
            formatter.Serialize(stream, customerCreated);
            byte[] customerCreatedEventByteArray = stream.ToArray();



            return new EventData(
                Guid.NewGuid(),
                "eventType",
                true,
                customerCreatedEventByteArray,
                null
                );
        }
    }

    [Serializable]
    public class CustomerCreated
    {
        public Guid Id { get; set; }
        public string Name { get; set; }
    }
}

I run the server and then then the client. I see an error when deserializing the CustomerCreated event on the server side. The error is: "End of stream was encountered before parsing was completed".

If I change this line:

private const string STREAM = "$ce-customer";

to this:

private const string STREAM = "customer-100";

Then deserialization works correctly on the server side.

How do I handle all customer events - not just customer 100?

I have --run-projections=all when starting Event Store. I have also enabled all projections:

enter image description here

1条回答
别忘想泡老子
2楼-- · 2020-04-14 07:32

This question helped me: Using the Event Store Client API (.NET), how to I write to a stream and link one event to another?

I simply had to change this:

PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                .DoNotResolveLinkTos() //Specifically this line
                .StartFromCurrent();

to this:

PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                .ResolveLinkTos() //Specifically this line
                .StartFromCurrent();

DoNotResolveLinkTos gets a link to the original event, whereas ResolveLinkTos gets the actual event itself. Therefore I was trying to deserialize the link object, which was causing the exception.

查看更多
登录 后发表回答