Limitowanie OPS

tech • 1558 słów • 8 minut czytania

Spędziłem ostatnio trochę czasu nad tematem limitowania operacji wykonywanych w określonym przedziale czasu. Sama idea zapoczątkowana została potrzebą szybkiej implementacji ograniczenia szybkości łącza, czyli typowe limitowanie transferu na potrzeby aplikacji, aby jeden program nie zajmował wszystkich dostępnych zasobów sieciowych. Nie będzie to typowe zagadnienie klasyfikowania, kolejkowania operacji lub kształtowania ruchu. Będzie to przedstawienie bardzo prostej metody, dosyć często wykorzystywanej w różnych aplikacjach.

Idea

Działanie jest bardzo proste, poniekąd podobne do typowych metod kontrolowania ruchu sieci, bazujący na odrzucaniu lub opóźnianiu pakietów. W istocie operacje zostają sztucznie opóźniane i wydłużane, aby w danej jednostce czasu nie przekroczyć wykonania określonej ich ilości.

Dla ułatwienia oprzyjmy się o sieciowy przykład. Funkcją $F$ możemy opisać przepływ danych w sieci, gdzie jej wartość od czasu - $F(t)$ - określać będzie tempo ruchu w sieci w danej chwili $t$. Ilość danych wysłanych w określonym przedziale czasu możemy przedstawić za pomocą $\int_x^y F(t) dt$, co odpowiada transmisji danych od chwili $x$ do chwili $y$. Na przykład funkcja $F_1(t)=D \delta(t-a) dt$, gdzie $\delta(t)$ jest deltą Diraca, odpowiada wysłaniu $D$ danych w chwili $a$, więc wynika z tego, że $\int_x^y F_1(t) = D$, gdy $x \le a \le y$.

Dochodząc do wniosku, chcąc ograniczyć szybkość przesyłania danych do wartości nie przekraczającej $R$ w przedziale czasu $W$, gdzie $W >= y-x$, zwanym oknem, należy spełnić następujący warunek:

$$ \int_x^y F(t) dt <= R(W) $$

Mówiąc ludzkim językiem, w czasie W możemy wysłać maksymalnie R danych i powinniśmy tak kontrolować i zarządzać wysyłaniem danych, aby ten warunek zawsze pozostał spełniony.

Porzućmy dalej wzory i opiszmy w skrócie działanie, jakie chcemy osiągnąć i rozważmy możliwe przypadki.

Jak wiemy, sumarycznie w czasie określanym przez okno W nie możemy wysłać większej ilości danych, niż R. A zatem, każde kolejne żądanie wysłania danych, powyżej ustalonego limitu, należy opóźnić o czas pozostały do zamknięcia bieżącego okna (czyli do czasu otwarcia kolejnego). Oczywiście dotyczy to tylko sytuacji, kiedy żądanie te napłynie w tym samym oknie, w którym wykorzystano już limit transferu. W innym przypadku, przetwarzamy żądanie.

Musimy pamiętać również, że żądania mogą napływać z różną częstotliwością oraz różną ilością danych do wysłania. Gdy ilość danych w jednym zadaniu przekracza limit, należy przetworzyć tyle danych ile jeszcze można wysłać w bieżącym oknie, a następnie wstrzymać się do czasu nowego okna i tak dalej, aż całe żądanie zostanie przetworzone.

Wszystko to opisane wyżej, można byłoby w prosty i ładny sposób przedstawić na wykresie, ale niestety takowego nie będzie.

Powyższy opis oparto na przykładzie ograniczania przepustowości wysyłania danych w sieci. W istocie, sposób ten nadaje się do limitowania dowolnych operacji, więc docelowo, algorytm operuje na żetonach (tokens). Każdy żeton reprezentuje pozwolenie na wykonanie określonej ilości operacji przez funkcję w danym kwancie czasu, aby założenia limitowania zostały spełnione. Sposób, w jaki żetony interpretowane są przez funkcję, zależy od jej implementacji.

Implementacja

Biorąc pod uwagę możliwość operowania na różnych typach i funkcjach, implementacja oparta jest na szablonach. Pozwala to w pełni “przełożyć” ideę działania w uogólniony algorytm, który w prosty sposób za pomocą klas cech i wytycznych, może zostać wykorzystany do limitowania dowolnych operacji w określonej jednostce czasu.

Cała zawartość znajduje się w przestrzeni nazw lops. Sercem limitera jest klasa Limiter:

template<
	typename Token,									// token type
	typename Time,									// time type
	ProcessType Process = ProcessAll,				// process algorithm
	typename TokenTraits = TokenTraits<Token>,		// token traits - is_eof/is_error
	typename TimePolicy = NativeTimePolicy<Time>	// time policy - time/sleep
>
class Limiter { ... };

Dostarcza ona get/set-tery do wartości rate, określającej ograniczenie operacji przypadających na 1 sekundę oraz operator wywołania, który zastępuje wywołanie limitowanej funkcji.

Operator ten przyjmuje obiekt spełniający koncept callable, czyli cokolwiek co da się wywołać i zachowuje się podobnie do funkcji, może to być funkcja, funktor, lambda, o określonym prototypie:

Token func(Token token);

Funkcją ta przyjmuje wartość żetonów jaka może wykorzystać, zwraca wartość wykorzystanych żetonów.

Parametr Token określa typ używany do reprezentacji żetonów. Może to być dowolny typ, ale musi zachowywać się jak typ liczbowy - posiadać operatory matematyczne.

Klasa cech żetonów TokenTraits, dostarcza informacji o sposobie interpretacji wartości żetonów zwracanych przez funkcję użytkownika. W jakiś sposób musimy wiedzieć, interpretować jej wyniki, szczególnie w dwóch istotnych sytuacjach - końca przetwarzania danych i wystąpienia błędu (w sytuacjach, gdy funkcja nie rzuca wyjątkami). Jej definicja jest prosta:

template<typename T>
struct TokenTraits {

	static inline bool is_eof(T token);
	static inline bool is_error(T token);

};

Parametr typu Time, określa typ używany do przechowywania czasu, wraz z TimePolicy dostarcza pełnych informacji niezbędnych do kontrolowania czasem. TimePolicy implementuje dwa niezbędne do działania limitera funkcjonalności - pobieranie upływu czasu i wstrzymanie działania bieżącego wątku na określony czas. Jej koncept przedstawia się następująco:

template<typename T>
struct TimePolicy {

	static void sleep(T usec);
	static T time();

};

Znacznik czasu zwracany przez funkcję time nie musi w żaden sposób odnosić się do realnego czasu, wymagane jest jedynie to, aby na podstawie kolejnych pobrań wartości można było wyliczyć upływ czasu, korzystając ze wzoru $d_t= t_1-t_0$.

Standardowo biblioteka zawiera definicje NativeTimePolicy, wykorzystujący w implementacji systemowe funkcje i elementy, specjalizowany do 64-bitowego typu przechowywującego czas. Bezproblemowo powinna działać pod systemami Microsoftu i posixowymi.

Pozostał jeszcze argument typu enum definiujący w czasie kompilacji używany algorytm przy wywołaniu funkcji - ProcessType. Obecnie są dwie implementacje, a raczej dwa sposoby wywoływania funkcji użytkownika przez limiter, zależnie od sposobu używania oryginalnej funkcji: ProcessOne i ProcessAll.

Przypuśćmy, że użytkownik wykorzystuje funkcje write do zapisywania danych do pliku, albo wysyłania w sieć. Jej prototyp jest typowy, jak na takie zastosowanie:

int write(const char* buffer, int length);

ProcessOne ogranicza się do jednokrotnego wywołania funkcji użytkownika przez limiter, oczywiście z odpowiednią do bieżącej sytuacji wartością żetonów. Jej najczęstsze zastosowanie sprowadza się do zastąpienia kodu użytkownika, który sam zapewnia wysłanie wszystkich danych. Dla przykładowej funkcji write, kod ten przedstawiałby się następująco:

while (length) {

	int ret = write(buffer, length);
	if (ret == 0)
		break;

	buffer += ret;
	length -= ret;
}

Limitowanie zapisywania danych w powyższym kodzie, ograniczałoby się do podmiany wywołania funkcji write na obiekt limitera:

typedef limiter::Limiter<uint32_t, uint64_t> WriteLimiter;
WriteLimiter writer;

...

while (length) {

	int ret = writer(
		[&buffer](int token) { return write(buffer, token); },
		length
	);
	if (ret == 0)
		break;

	buffer += ret;
	length -= ret;
}

ProcessAll w przeciwieństwie do ProcessOne, tak długo będzie wywoływał funkcje użytkownika, dopóki nie przetworzy wszystkich danych jakie zlecił użytkownik, lub nie zwróci wartości interpretowanej jako eof.

Sposób ten idealnie nadaje się do kodu użytkownika, w którym następuje jedno, pojedyncze wywołanie funkcji write:

int ret = write(buffer, length);

wystarczy zamienić na obiekt limitera z odpowiednią funkcją callback:

typedef limiter::Limiter<uint32_t, uint64_t> WriteLimiter;
WriteLimiter writer;

...

int ret = writer(
	[&buffer](int token) -> int {
		int ret = write(buffer, token);
		buffer += ret;
		return ret;
	},
	length
);

Jeśli zerkniemy do definicji funkcji process dla algorytmu ProcessAll, zobaczymy, że znajduje się w niej podobny mechanizm, z pętlą zapewniającą przetworzenie wszystkich danych, podobnie jak to miało miejsce w kodzie użytkownika przedstawionego nieco wyżej (dla ProcessOne).

Oczywiście, w miejsce ProcessOne, możemy bez problemu zastosować ProcessAll, ale wtedy niepotrzebnie kompilator wygeneruje kod, w którym pętla zawarta w pętli de facto będą robiły to samo. Niepotrzebne, może bez sensowne dublowanie, ale będzie zadziała to poprawnie. Niemniej warto dobierać odpowiedni sposób przetwarzania dla danej sytuacji.

W istocie wybór typu paradoksalnie jest odwrotnością tego co robi kod użytkownika - gdy przetwarza wszystko w pętli, wybieramy ProcessOne, gdy wywołuje tylko raz funkcje użytkownika, wybieramy ProcessAll.

W implementacji wykorzystano popularny idom int-to-type i static call dispatch, do wyboru w czasie kompilacji odpowiedniej funkcji process na podstawie wartości ProcessType.

Całym zrządzaniem i kontrolowaniem czasem, żetonami i wywoływaniem funkcji użytkownika zajmuje się kod zawarty w funkcji do_process:

template<typename T>
Token do_process(T func, Token token) {

	// process window stuff
	Time now = TimePolicy::time();
	Time inv = now - m_start;

	if (inv >= m_interval) {
		// in new window
		m_start = now - (inv % m_interval);
		m_used = 0;

	} else if (m_used >= m_value) {
		// sleep rest of window time
		TimePolicy::sleep(m_interval - inv);
		m_start = TimePolicy::time();
		m_used = 0;
	}

	// calculate token chunk for current window
	Token chunk = m_value - m_used;
	chunk = std::min(chunk, token);

	// call user function
	Token ret = func(chunk);
	if ( !TokenTraits::is_error(ret) )
		m_used += ret;

	return ret;
}

Kod ten jest implementacją przedstawionej wyżej idei ograniczania i limitowania operacji. Cały kod można podzielić na trzy części, fragment odpowiedzialny za sterowane oknem i ewentualne wstrzymywanie działania, kod wyliczający wartość żetonów, jaka została przypisana do bieżącego wywołania i kod odpowiedzialny za sterowanie funkcją użytkownika.

Eksperymenty

Tymczasem mogę zachęcić do eksperymentów z przedstawiona wyżej implementacją, możliwe, że coś da się zrobić inaczej, lepiej, lub w jakikolwiek inny sposób uprościć, albo rozwinąć…

Pełne źródła implementacji dostępne są na moim githubie, jako gist 5425750, na licencji MIT (jeśli jakaś musi być).

Zastanawiam się jak można wykorzystać przedstawiony mechanizm do wspólnego limitowania określonej puli operacji, tak, aby cała pula objęta była tym samym limitem. Pierwsza myśl jaka przychodzi do głowy to używanie tego samego obiektu limitera dla wybranego zbioru operacji. W aplikacjach wielowątkowych pewnie będzie trzeba “oznaczyć” ciało operatora wywołania jako strefę krytyczną, aby tylko jeden watek w danym czasie mógł wykonywać operacje. Ciekawe czy są jakieś większe problemy w takim zastosowaniu. Zastanawiam się również, czy istnieje jeszcze inne proste wykorzystanie przedstawionej implementacji (może z małymi zmianami) w celu ograniczania puli tych samych operacji, pochodzących z różnych źródeł.

Dodatkowym rozwinięciem mechanizmu limitowania jest wprowadzenie adekwatnej funkcjonalności monitorującej operacje, w celu określania szybkości przetwarzania operacji. Niezależny mechanizm, mógłby być wykorzystywany np. do mierzenia transferu danych w sieci. Pomyślę o tym…

Komentarze (0)

Dodaj komentarz

/dozwolony markdown/

/nie zostanie opublikowany/