Limitowanie OPS
• tech • 1558 słów • 8 minut czytania
Spędziłem ostatnio trochę czasu nad tematem limitowania operacji wykonywanych w określonym przedziale czasu. Sama idea zapoczątkowana została potrzebą szybkiej implementacji ograniczenia szybkości łącza, czyli typowe limitowanie transferu na potrzeby aplikacji, aby jeden program nie zajmował wszystkich dostępnych zasobów sieciowych. Nie będzie to typowe zagadnienie klasyfikowania, kolejkowania operacji lub kształtowania ruchu. Będzie to przedstawienie bardzo prostej metody, dosyć często wykorzystywanej w różnych aplikacjach.
Idea
Działanie jest bardzo proste, poniekąd podobne do typowych metod kontrolowania ruchu sieci, bazujący na odrzucaniu lub opóźnianiu pakietów. W istocie operacje zostają sztucznie opóźniane i wydłużane, aby w danej jednostce czasu nie przekroczyć wykonania określonej ich ilości.
Dla ułatwienia oprzyjmy się o sieciowy przykład. Funkcją $F$ możemy opisać przepływ danych w sieci, gdzie jej wartość od czasu - $F(t)$ - określać będzie tempo ruchu w sieci w danej chwili $t$. Ilość danych wysłanych w określonym przedziale czasu możemy przedstawić za pomocą $\int_x^y F(t) dt$, co odpowiada transmisji danych od chwili $x$ do chwili $y$. Na przykład funkcja $F_1(t)=D \delta(t-a) dt$, gdzie $\delta(t)$ jest deltą Diraca, odpowiada wysłaniu $D$ danych w chwili $a$, więc wynika z tego, że $\int_x^y F_1(t) = D$, gdy $x \le a \le y$.
Dochodząc do wniosku, chcąc ograniczyć szybkość przesyłania danych do wartości nie przekraczającej $R$ w przedziale czasu $W$, gdzie $W >= y-x$, zwanym oknem, należy spełnić następujący warunek:
$$ \int_x^y F(t) dt <= R(W) $$
Mówiąc ludzkim językiem, w czasie W
możemy wysłać maksymalnie R
danych i powinniśmy tak kontrolować i zarządzać wysyłaniem danych, aby ten warunek zawsze pozostał spełniony.
Porzućmy dalej wzory i opiszmy w skrócie działanie, jakie chcemy osiągnąć i rozważmy możliwe przypadki.
Jak wiemy, sumarycznie w czasie określanym przez okno W
nie możemy wysłać większej ilości danych, niż R
. A zatem, każde kolejne żądanie wysłania danych, powyżej ustalonego limitu, należy opóźnić o czas pozostały do zamknięcia bieżącego okna (czyli do czasu otwarcia kolejnego). Oczywiście dotyczy to tylko sytuacji, kiedy żądanie te napłynie w tym samym oknie, w którym wykorzystano już limit transferu. W innym przypadku, przetwarzamy żądanie.
Musimy pamiętać również, że żądania mogą napływać z różną częstotliwością oraz różną ilością danych do wysłania. Gdy ilość danych w jednym zadaniu przekracza limit, należy przetworzyć tyle danych ile jeszcze można wysłać w bieżącym oknie, a następnie wstrzymać się do czasu nowego okna i tak dalej, aż całe żądanie zostanie przetworzone.
Wszystko to opisane wyżej, można byłoby w prosty i ładny sposób przedstawić na wykresie, ale niestety takowego nie będzie.
Powyższy opis oparto na przykładzie ograniczania przepustowości wysyłania danych w sieci. W istocie, sposób ten nadaje się do limitowania dowolnych operacji, więc docelowo, algorytm operuje na żetonach (tokens). Każdy żeton reprezentuje pozwolenie na wykonanie określonej ilości operacji przez funkcję w danym kwancie czasu, aby założenia limitowania zostały spełnione. Sposób, w jaki żetony interpretowane są przez funkcję, zależy od jej implementacji.
Implementacja
Biorąc pod uwagę możliwość operowania na różnych typach i funkcjach, implementacja oparta jest na szablonach. Pozwala to w pełni “przełożyć” ideę działania w uogólniony algorytm, który w prosty sposób za pomocą klas cech i wytycznych, może zostać wykorzystany do limitowania dowolnych operacji w określonej jednostce czasu.
Cała zawartość znajduje się w przestrzeni nazw lops
. Sercem limitera jest klasa Limiter
:
template<
typename Token, // token type
typename Time, // time type
ProcessType Process = ProcessAll, // process algorithm
typename TokenTraits = TokenTraits<Token>, // token traits - is_eof/is_error
typename TimePolicy = NativeTimePolicy<Time> // time policy - time/sleep
>
class Limiter { ... };
Dostarcza ona get/set-tery do wartości rate
, określającej ograniczenie operacji przypadających na 1 sekundę oraz operator wywołania, który zastępuje wywołanie limitowanej funkcji.
Operator ten przyjmuje obiekt spełniający koncept callable, czyli cokolwiek co da się wywołać i zachowuje się podobnie do funkcji, może to być funkcja, funktor, lambda, o określonym prototypie:
Token func(Token token);
Funkcją ta przyjmuje wartość żetonów jaka może wykorzystać, zwraca wartość wykorzystanych żetonów.
Parametr Token
określa typ używany do reprezentacji żetonów. Może to być dowolny typ, ale musi zachowywać się jak typ liczbowy - posiadać operatory matematyczne.
Klasa cech żetonów TokenTraits
, dostarcza informacji o sposobie interpretacji wartości żetonów zwracanych przez funkcję użytkownika. W jakiś sposób musimy wiedzieć, interpretować jej wyniki, szczególnie w dwóch istotnych sytuacjach - końca przetwarzania danych i wystąpienia błędu (w sytuacjach, gdy funkcja nie rzuca wyjątkami). Jej definicja jest prosta:
template<typename T>
struct TokenTraits {
static inline bool is_eof(T token);
static inline bool is_error(T token);
};
Parametr typu Time
, określa typ używany do przechowywania czasu, wraz z TimePolicy
dostarcza pełnych informacji niezbędnych do kontrolowania czasem. TimePolicy
implementuje dwa niezbędne do działania limitera funkcjonalności - pobieranie upływu czasu i wstrzymanie działania bieżącego wątku na określony czas. Jej koncept przedstawia się następująco:
template<typename T>
struct TimePolicy {
static void sleep(T usec);
static T time();
};
Znacznik czasu zwracany przez funkcję time
nie musi w żaden sposób odnosić się do realnego czasu, wymagane jest jedynie to, aby na podstawie kolejnych pobrań wartości można było wyliczyć upływ czasu, korzystając ze wzoru $d_t= t_1-t_0$.
Standardowo biblioteka zawiera definicje NativeTimePolicy
, wykorzystujący w implementacji systemowe funkcje i elementy, specjalizowany do 64-bitowego typu przechowywującego czas. Bezproblemowo powinna działać pod systemami Microsoftu i posixowymi.
Pozostał jeszcze argument typu enum
definiujący w czasie kompilacji używany algorytm przy wywołaniu funkcji - ProcessType
. Obecnie są dwie implementacje, a raczej dwa sposoby wywoływania funkcji użytkownika przez limiter, zależnie od sposobu używania oryginalnej funkcji: ProcessOne
i ProcessAll
.
Przypuśćmy, że użytkownik wykorzystuje funkcje write
do zapisywania danych do pliku, albo wysyłania w sieć. Jej prototyp jest typowy, jak na takie zastosowanie:
int write(const char* buffer, int length);
ProcessOne
ogranicza się do jednokrotnego wywołania funkcji użytkownika przez limiter, oczywiście z odpowiednią do bieżącej sytuacji wartością żetonów. Jej najczęstsze zastosowanie sprowadza się do zastąpienia kodu użytkownika, który sam zapewnia wysłanie wszystkich danych. Dla przykładowej funkcji write
, kod ten przedstawiałby się następująco:
while (length) {
int ret = write(buffer, length);
if (ret == 0)
break;
buffer += ret;
length -= ret;
}
Limitowanie zapisywania danych w powyższym kodzie, ograniczałoby się do podmiany wywołania funkcji write
na obiekt limitera:
typedef limiter::Limiter<uint32_t, uint64_t> WriteLimiter;
WriteLimiter writer;
...
while (length) {
int ret = writer(
[&buffer](int token) { return write(buffer, token); },
length
);
if (ret == 0)
break;
buffer += ret;
length -= ret;
}
ProcessAll
w przeciwieństwie do ProcessOne
, tak długo będzie wywoływał funkcje użytkownika, dopóki nie przetworzy wszystkich danych jakie zlecił użytkownik, lub nie zwróci wartości interpretowanej jako eof.
Sposób ten idealnie nadaje się do kodu użytkownika, w którym następuje jedno, pojedyncze wywołanie funkcji write
:
int ret = write(buffer, length);
wystarczy zamienić na obiekt limitera z odpowiednią funkcją callback:
typedef limiter::Limiter<uint32_t, uint64_t> WriteLimiter;
WriteLimiter writer;
...
int ret = writer(
[&buffer](int token) -> int {
int ret = write(buffer, token);
buffer += ret;
return ret;
},
length
);
Jeśli zerkniemy do definicji funkcji process
dla algorytmu ProcessAll
, zobaczymy, że znajduje się w niej podobny mechanizm, z pętlą zapewniającą przetworzenie wszystkich danych, podobnie jak to miało miejsce w kodzie użytkownika przedstawionego nieco wyżej (dla ProcessOne
).
Oczywiście, w miejsce ProcessOne
, możemy bez problemu zastosować ProcessAll
, ale wtedy niepotrzebnie kompilator wygeneruje kod, w którym pętla zawarta w pętli de facto będą robiły to samo. Niepotrzebne, może bez sensowne dublowanie, ale będzie zadziała to poprawnie. Niemniej warto dobierać odpowiedni sposób przetwarzania dla danej sytuacji.
W istocie wybór typu paradoksalnie jest odwrotnością tego co robi kod użytkownika - gdy przetwarza wszystko w pętli, wybieramy ProcessOne
, gdy wywołuje tylko raz funkcje użytkownika, wybieramy ProcessAll
.
W implementacji wykorzystano popularny idom int-to-type i static call dispatch, do wyboru w czasie kompilacji odpowiedniej funkcji process
na podstawie wartości ProcessType
.
Całym zrządzaniem i kontrolowaniem czasem, żetonami i wywoływaniem funkcji użytkownika zajmuje się kod zawarty w funkcji do_process
:
template<typename T>
Token do_process(T func, Token token) {
// process window stuff
Time now = TimePolicy::time();
Time inv = now - m_start;
if (inv >= m_interval) {
// in new window
m_start = now - (inv % m_interval);
m_used = 0;
} else if (m_used >= m_value) {
// sleep rest of window time
TimePolicy::sleep(m_interval - inv);
m_start = TimePolicy::time();
m_used = 0;
}
// calculate token chunk for current window
Token chunk = m_value - m_used;
chunk = std::min(chunk, token);
// call user function
Token ret = func(chunk);
if ( !TokenTraits::is_error(ret) )
m_used += ret;
return ret;
}
Kod ten jest implementacją przedstawionej wyżej idei ograniczania i limitowania operacji. Cały kod można podzielić na trzy części, fragment odpowiedzialny za sterowane oknem i ewentualne wstrzymywanie działania, kod wyliczający wartość żetonów, jaka została przypisana do bieżącego wywołania i kod odpowiedzialny za sterowanie funkcją użytkownika.
Eksperymenty
Tymczasem mogę zachęcić do eksperymentów z przedstawiona wyżej implementacją, możliwe, że coś da się zrobić inaczej, lepiej, lub w jakikolwiek inny sposób uprościć, albo rozwinąć…
Pełne źródła implementacji dostępne są na moim githubie, jako gist 5425750, na licencji MIT (jeśli jakaś musi być).
Zastanawiam się jak można wykorzystać przedstawiony mechanizm do wspólnego limitowania określonej puli operacji, tak, aby cała pula objęta była tym samym limitem. Pierwsza myśl jaka przychodzi do głowy to używanie tego samego obiektu limitera dla wybranego zbioru operacji. W aplikacjach wielowątkowych pewnie będzie trzeba “oznaczyć” ciało operatora wywołania jako strefę krytyczną, aby tylko jeden watek w danym czasie mógł wykonywać operacje. Ciekawe czy są jakieś większe problemy w takim zastosowaniu. Zastanawiam się również, czy istnieje jeszcze inne proste wykorzystanie przedstawionej implementacji (może z małymi zmianami) w celu ograniczania puli tych samych operacji, pochodzących z różnych źródeł.
Dodatkowym rozwinięciem mechanizmu limitowania jest wprowadzenie adekwatnej funkcjonalności monitorującej operacje, w celu określania szybkości przetwarzania operacji. Niezależny mechanizm, mógłby być wykorzystywany np. do mierzenia transferu danych w sieci. Pomyślę o tym…
Komentarze (0)