Saga паттерн и распределенные транзакции

Saga паттерн и распределенные транзакции

Saga паттерн и распределенные транзакции помогают отменить операцию которая выполняется в несколько шагов, которые вместе можно назвать как консистентная операция (транзакция).

Проблема и контекст

Приложения, работающие в облаке, часто изменяют данные. Эти данные могут быть распределены по источникам данных, хранящихся в различных географических точках. Во избежание конфликтов и повышения производительности в распределенной системе, приложение не должно пытаться обеспечить высокую согласованность транзакций. Скорее, приложение должно реализовывать "возможную согласованность" (eventual consistency). В этой модели типичная бизнес-операция состоит из серии автономных шагов. В то время как эти шаги выполняются, общее представление о состоянии системы может быть непоследовательным, но когда операция завершена и все шаги выполнены, система должна снова стать согласованной.

Серьезной проблемой в модели возможной согласованности (eventual consistency) является то, как обработать шаг, который не удалось обработать. В этом случае может потребоваться отменить всю работу, выполненную предыдущими шагами операции. Однако данные нельзя просто откатить, потому что другие параллельные экземпляры приложения, возможно, уже изменили эти данные. Даже в тех случаях, когда данные не были изменены параллельным экземпляром, отмена шага может быть не просто вопросом восстановления исходного состояния. Может возникнуть необходимость применить различные правила, специфичные для бизнеса.

Транзакция - это логически атомарная единица работы, которая может охватывать несколько запросов к базе данных. Все транзакции должны следовать ACID требованиям:

Atomicity — Атомарность

Атомарность гарантирует, что никакая транзакция не будет зафиксирована в системе частично. Будут либо выполнены все её подоперации, либо не выполнено ни одной.

Consistency — Согласованность

Транзакция, достигающая своего нормального завершения (EOT — end of transaction, завершение транзакции) и, тем самым, фиксирующая свои результаты, сохраняет согласованность базы данных. Другими словами, каждая успешная транзакция по определению фиксирует только допустимые результаты. Это условие является необходимым для поддержки четвёртого свойства.

Isolation — Изолированность

Во время выполнения транзакции параллельные транзакции не должны оказывать влияния на её результат (Уровни изоляции транзакций)

Durability — Стойкость

Стойкость — если пользователь получил подтверждение от системы, что транзакция выполнена, он может быть уверен, что сделанные им изменения не будут отменены из-за какого-либо сбоя.

Решение

Решением проблемы описанной выше может стать Compensating Transaction Pattern. Таким шаблоном может выступить Saga pattern.

Когда каждый сервис имеет свою собственную базу данных и бизнес-транзакция охватывает несколько сервисов, как мы обеспечиваем согласованность данных между сервисами. Каждый запрос имеет компенсационный запрос, который выполняется при сбое запроса. Это может быть реализовано двумя способами:

  • Choreography (Хореография) — при отсутствии центра координации, каждый сервис создает и слушает события другого сервиса и решает, следует ли предпринять действие или нет. Хореография это способ указать, как две или более сторон; ни один из которых не имеет никакого контроля над процессами других сторон, или, возможно, какой-либо видимости этих процессов - не может координировать свои действия и процессы для обмена информацией и ценностями. Используйте хореографию, когда требуется координация между областями контроля / видимости. Вы можете думать о хореографии в простом сценарии как о сетевом протоколе. Это диктует приемлемые образцы запросов и ответов между сторонами.

хореография saga

  • Orchestration (Оркестрация) — оркестратор (объект) берет на себя ответственность за принятие сагой решений и последовательность бизнес-логики. когда у вас есть контроль над всеми участниками процесса. когда все они находятся в одной области контроля, и вы можете контролировать поток действий. Это, конечно, чаще всего, когда вы указываете бизнес-процесс, который будет выполняться внутри одной организации, которую вы контролируете.

оркестрация

Sagas исходят из осознания того, что особо долгоживущие транзакции, а также далеко распределенные транзакции по границам местоположения и / или доверия не могут быть легко обработаны с использованием классической модели ACID c Two-Phase Commit принципом. 

Two-Phase Commit

saga two-phase commit

  • Фаза подготовки: На этом этапе управляющий узел запрашивает все участвующие узлы, готовы ли они принять транзакцию. Участвующие узлы тогда ответили бы да или нет.
  • Фаза фиксации. Затем, если все узлы ответили утвердительно, управляющий узел попросит их зафиксировать, в противном случае, даже если один узел ответил отрицательно, он попросит откатить назад.

Вместо этого Saga разбивает работу на отдельные транзакции, последствия которых могут быть каким-то образом отменены после того, как работа была выполнена и зафиксирована.

saga flow

На рисунке выше показана простая реализация Saga паттерна. Если вы бронируете маршрут путешествия, вам нужен автомобиль, отель и рейс. Если вы не можете получить все из них, это, вероятно, не стоит ехать. Также совершенно очевидно, что вы не можете подключить всех этих провайдеров к распределенной транзакции ACID. Вместо этого у вас будет действие по бронированию проката автомобилей, которое знает, как выполнить бронирование, а также как отменить его - один для отеля и один для рейсов.

Операции сгруппированы в composite job (routing slip), которое передается по цепочке действий. При желании вы можете подписать / зашифровать элементы бланка маршрутизации, чтобы их мог понять и манипулировать только предполагаемый получатель. Когда действие завершается, оно добавляет запись о завершении в квитанцию маршрутизации вместе с информацией о том, где может быть достигнута ее компенсационная операция (например, через очередь). Когда действие завершается неудачей, оно очищается локально, а затем отправляет бланк маршрутизации назад на адрес компенсации последнего завершенного действия, чтобы развернуть результат транзакции.

Схема отмены транзакции будет выглядеть следующим образом:

cancel

Так же стоит отметить, что Saga очень сильно соприкасается с парадигмой event sourcingа.

Проблемы и вопросы при реализации

Принимая решение о том, как реализовать этот шаблон, учтите следующие моменты:

  • Может быть нелегко определить, когда произошел сбой шага в операции, реализующей возможной согласованности (eventual consistency). Шаг может не сразу завершиться неудачей, но вместо этого он может заблокироваться. Возможно, потребуется реализовать некоторую форму механизма тайм-аута или даже circuit-breaker-pattern.
  • Логика компенсации не легко обобщается. Компенсирующая транзакция зависит от приложения; он опирается на то, что приложение обладает достаточной информацией, чтобы можно было отменить "эффекты" каждого шага в неудачной операции.
  • Вы должны определить шаги в компенсирующей транзакции как идемпотентные команды. Это позволяет повторить шаги в случае сбоя самой компенсирующей транзакции.
  • Инфраструктура, которая обрабатывает этапы исходной операции и компенсирующую транзакцию, должна быть устойчивой. Он не должен терять информацию, необходимую для компенсации неудачного шага, и должен иметь возможность надежно отслеживать ход выполнения логики компенсации.
  • Компенсирующая транзакция не обязательно возвращает данные в системе в состояние, в котором она находилась в начале исходной операции. Вместо этого он компенсирует работу, выполненную шагами, которые были успешно завершены до сбоя операции.
  • Порядок шагов в компенсирующей транзакции не обязательно должен быть зеркальным отражением шагов в исходной операции. Например, одно хранилище данных может быть более чувствительным к несоответствиям, чем другое, и поэтому сначала необходимо выполнить шаги в компенсирующей транзакции, которая отменяет изменения в этом хранилище.
  • Установка кратковременной блокировки на основе тайм-аута для каждого ресурса, необходимого для завершения операции, и заблаговременное получение этих ресурсов может помочь повысить вероятность успешного завершения всей операции. Работу следует выполнять только после того, как все ресурсы получены. Все действия должны быть завершены до истечения срока действия блокировки.
  • Подумайте об использовании логики повторения, которая более простительна, чем обычно, чтобы минимизировать сбои, которые инициируют компенсирующую транзакцию. Если шаг в операции, которая реализует возможную согласованность, завершается неудачей, попробуйте обработать ошибку как временное исключение и повторите шаг. Прервите операцию и начните компенсирующую транзакцию только в том случае, если шаг не удался повторно или безвозвратно.

Пример реализации Saga pattern в C#

Реализация которую я нашел на gist отлично подходит что б показать пример реализации, но стоит отметить что это не production-ready решение


namespace Sagas
{
    using System;
    using System.Collections.Generic;    

    class Program
    {
        static ActivityHost[] processes;

        static void Main(string[] args)
        {
            var routingSlip = new RoutingSlip(new WorkItem[]
                {
                    new WorkItem<ReserveCarActivity>(new WorkItemArguments{{"vehicleType", "Compact"}}),
                    new WorkItem<ReserveHotelActivity>(new WorkItemArguments{{"roomType", "Suite"}}),
                    new WorkItem<ReserveFlightActivity>(new WorkItemArguments{{"destination", "DUS"}})
                });


            // imagine these being completely separate processes with queues between them
            processes = new ActivityHost[]
                                {
                                    new ActivityHost<ReserveCarActivity>(Send),
                                    new ActivityHost<ReserveHotelActivity>(Send),
                                    new ActivityHost<ReserveFlightActivity>(Send)
                                };

            // hand off to the first address
            Send(routingSlip.ProgressUri, routingSlip);
        }

        static void Send(Uri uri, RoutingSlip routingSlip)
        {
            // this is effectively the network dispatch
            foreach (var process in processes)
            {
                if (process.AcceptMessage(uri, routingSlip))
                {
                    break;
                }
            }
        }

    }


 
    class ReserveCarActivity : Activity
    {
        static Random rnd = new Random(2);

        public override WorkLog DoWork(WorkItem workItem)
        {
            Console.WriteLine("Reserving car");
            var car = workItem.Arguments["vehicleType"];
            var reservationId = rnd.Next(100000);
            Console.WriteLine("Reserved car {0}", reservationId);
            return new WorkLog(this, new WorkResult { { "reservationId", reservationId } });
        }

        public override bool Compensate(WorkLog item, RoutingSlip routingSlip)
        {
            var reservationId = item.Result["reservationId"];
            Console.WriteLine("Cancelled car {0}", reservationId);
            return true;
        }

        public override Uri WorkItemQueueAddress
        {
            get { return new Uri("sb://./carReservations"); }
        }

        public override Uri CompensationQueueAddress
        {
            get { return new Uri("sb://./carCancellactions"); }
        }
    }

    class ReserveHotelActivity : Activity
    {
        static Random rnd = new Random(1);

        public override WorkLog DoWork(WorkItem workItem)
        {
            Console.WriteLine("Reserving hotel");
            var car = workItem.Arguments["roomType"];
            var reservationId = rnd.Next(100000);
            Console.WriteLine("Reserved hotel {0}", reservationId);
            return new WorkLog(this, new WorkResult { { "reservationId", reservationId } });
        }

        public override bool Compensate(WorkLog item, RoutingSlip routingSlip)
        {
            var reservationId = item.Result["reservationId"];
            Console.WriteLine("Cancelled hotel {0}", reservationId);
            return true;
        }

        public override Uri WorkItemQueueAddress
        {
            get { return new Uri("sb://./hotelReservations"); }
        }

        public override Uri CompensationQueueAddress
        {
            get { return new Uri("sb://./hotelCancellations"); }
        }
    }

    class ReserveFlightActivity : Activity
    {
        static Random rnd = new Random(3);

        public override WorkLog DoWork(WorkItem workItem)
        {
            Console.WriteLine("Reserving flight");
            var car = workItem.Arguments["fatzbatz"]; // this throws
            var reservationId = rnd.Next(100000);
            Console.WriteLine("Reserved flight {0}", reservationId);
            return new WorkLog(this, new WorkResult { { "reservationId", reservationId } });
        }

        public override bool Compensate(WorkLog item, RoutingSlip routingSlip)
        {
            var reservationId = item.Result["reservationId"];
            Console.WriteLine("Cancelled flight {0}", reservationId);
            return true;
        }

        public override Uri WorkItemQueueAddress
        {
            get { return new Uri("sb://./flightReservations"); }
        }

        public override Uri CompensationQueueAddress
        {
            get { return new Uri("sb://./flightCancellations"); }
        }
    }


    abstract class Activity
    {
        public abstract WorkLog DoWork(WorkItem item);
        public abstract bool Compensate(WorkLog item, RoutingSlip routingSlip);
        public abstract Uri WorkItemQueueAddress { get; }
        public abstract Uri CompensationQueueAddress { get; }
    }

    class WorkLog
    {
        readonly Type activityType;
        readonly WorkResult result;

        public WorkLog(Activity activity, WorkResult result)
        {
            this.result = result;
            this.activityType = activity.GetType();
        }

        public WorkResult Result
        {
            get { return this.result; }
        }

        public Type ActivityType
        {
            get { return this.activityType; }
        }
    }

    class WorkItemArguments : Dictionary<string, object>
    {
    }

    class WorkResult : Dictionary<string, object>
    {
    }


    abstract class WorkItem
    {
        protected WorkItem(WorkItemArguments arguments)
        {
            this.Arguments = arguments;
        }

        public RoutingSlip RoutingSlip { get; set; }
        public WorkItemArguments Arguments { get; set; }
        public abstract Type ActivityType { get; }
    }

    class WorkItem<T> : WorkItem where T : Activity
    {
        public WorkItem(WorkItemArguments args) : base(args)
        {
        }

        public override Type ActivityType
        {
            get { return typeof (T); }
        }
    }

    class RoutingSlip
    {
        readonly Stack<WorkLog> completedWorkLogs = new Stack<WorkLog>();
        readonly Queue<WorkItem> nextWorkItem = new Queue<WorkItem>();

        public RoutingSlip()
        {
        }

        public RoutingSlip(IEnumerable<WorkItem> workItems)
        {
            foreach (var workItem in workItems)
            {
                this.nextWorkItem.Enqueue(workItem);
            }
        }

        public bool IsCompleted
        {
            get { return this.nextWorkItem.Count == 0; }
        }

        public bool IsInProgress
        {
            get { return this.completedWorkLogs.Count > 0; }
        }

        public bool ProcessNext()
        {
            if (this.IsCompleted)
            {
                throw new InvalidOperationException();
            }

            var currentItem = this.nextWorkItem.Dequeue();
            var activity = (Activity) Activator.CreateInstance(currentItem.ActivityType);
            try
            {
                var result = activity.DoWork(currentItem);
                if (result != null)
                {
                    this.completedWorkLogs.Push(result);
                    return true;
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception {0}", e.Message);
            }
            return false;
        }

        public Uri ProgressUri
        {
            get
            {
                if (IsCompleted)
                {
                    return null;
                }
                else
                {
                    return
                        ((Activity) Activator.CreateInstance(this.nextWorkItem.Peek().ActivityType)).
                            WorkItemQueueAddress;
                }
            }
        }

        public Uri CompensationUri
        {
            get
            {
                if (!IsInProgress)
                {
                    return null;
                }
                else
                {
                    return
                        ((Activity) Activator.CreateInstance(this.completedWorkLogs.Peek().ActivityType)).
                            CompensationQueueAddress;
                }
            }
        }

        public bool UndoLast()
        {
            if (!this.IsInProgress)
            {
                throw new InvalidOperationException();
            }

            var currentItem = this.completedWorkLogs.Pop();
            var activity = (Activity) Activator.CreateInstance(currentItem.ActivityType);
            try
            {
                return activity.Compensate(currentItem, this);
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception {0}", e.Message);
                throw;
            }
            
        }
    }

    abstract class ActivityHost
    {
        Action<Uri, RoutingSlip> send;

        public ActivityHost(Action<Uri, RoutingSlip> send)
        {
            this.send = send;
        }

        public void ProcessForwardMessage(RoutingSlip routingSlip)
        {
            if (!routingSlip.IsCompleted)
            {
                // if the current step is successful, proceed
                // otherwise go to the Unwind path
                if (routingSlip.ProcessNext())
                {
                    // recursion stands for passing context via message
                    // the routing slip can be fully serialized and passed
                    // between systems. 
                    this.send(routingSlip.ProgressUri, routingSlip);
                }
                else
                {
                    // pass message to unwind message route
                    this.send(routingSlip.CompensationUri, routingSlip);
                }
            }
        }

        public void ProcessBackwardMessage(RoutingSlip routingSlip)
        {
            if (routingSlip.IsInProgress)
            {
                // UndoLast can put new work on the routing slip
                // and return false to go back on the forward 
                // path
                if (routingSlip.UndoLast())
                {
                    // recursion stands for passing context via message
                    // the routing slip can be fully serialized and passed
                    // between systems 
                    this.send(routingSlip.CompensationUri, routingSlip);
                }
                else
                {
                    this.send(routingSlip.ProgressUri, routingSlip);
                }
            }
        }

        public abstract bool AcceptMessage(Uri uri, RoutingSlip routingSlip);
    }

    class ActivityHost<T> : ActivityHost where T : Activity, new()
    {
        public ActivityHost(Action<Uri, RoutingSlip> send)
            : base(send)
        {
        }

        public override bool AcceptMessage(Uri uri, RoutingSlip routingSlip)
        {
            var activity = new T();
            if (activity.CompensationQueueAddress.Equals(uri))
            {
                this.ProcessBackwardMessage(routingSlip);
                return true;
            }
            if (activity.WorkItemQueueAddress.Equals(uri))
            {
                this.ProcessForwardMessage(routingSlip);
                return true;
            }
            return false;
        }

    }    
}

 

0 1.8K 09.03.2020 13:56

Комментарии:

Пожалуйста авторизируйтесь, чтобы получить возможность оставлять комментарии