Specialized components
Background processing
Kubernetes event collector
28 min
overview the kubernetes event collector is a specialized background task within the ir engine that monitors and logs events from the kubernetes cluster where the application is deployed it serves as an operational monitoring system, capturing important cluster activities such as pod startups, failures, scaling operations, and system warnings by collecting and recording these events, the collector provides valuable insights for troubleshooting, operational awareness, and system health monitoring this chapter explores the implementation, functionality, and integration of the kubernetes event collector within the ir engine's background processing system purpose and functionality the kubernetes event collector serves several important purposes operational monitoring tracks system events in the kubernetes environment troubleshooting provides context for diagnosing application issues audit trail creates a historical record of cluster activities health awareness identifies potential problems or anomalies resource tracking monitors scaling and resource allocation events the collector typically captures events such as pod lifecycle events (creation, deletion, failures) deployment scaling operations resource constraints and limits system warnings and errors node related events configuration changes implementation task initialization the kubernetes event collector is initialized when the task server application starts // from src/start ts import collectevents from ' /collect events'; export const start = async () promise\<application> => { const app = await createfeatherskoaapp(servermode task); // initialize the kubernetes event collector collectevents(app); // other initialization code return app; }; the collectevents function receives the application instance, which provides access to the kubernetes client, configuration, and other resources needed for event collection collector setup the collector is implemented in src/collect events ts // simplified from src/collect events ts import config from '@ir engine/server core/src/appconfig'; import { getstate } from '@ir engine/hyperflux'; import { serverstate } from '@ir engine/server core/src/serverstate'; import multilogger from '@ir engine/server core/src/serverlogger'; const logger = multilogger child({ component 'taskserver\ collect events' }); let lasttimestamp string; // main function to fetch and log kubernetes events async function fetchandlogkubernetesevents() { // get the kubernetes client from server state const k8sclient = getstate(serverstate) k8defaultclient; if (!k8sclient) { logger warn('kubernetes client not available'); return; } // get the namespace to monitor from configuration const namespace = config server namespace; logger info(`checking for events in namespace ${namespace}`); try { // query the kubernetes api for events const eventsresponse = await k8sclient listnamespacedevent({ namespace, // in a real implementation, we would filter by timestamp // timeoutseconds 10 }); // process and log the events for (const event of eventsresponse items) { // extract event details const kind = event involvedobject kind; const name = event involvedobject name; const reason = event reason; const message = event message; const timestamp = event lasttimestamp || event firsttimestamp || new date(); // log the event logger info(`k8s event ${kind} ${name} ${reason} ${message} (at ${timestamp})`); } // update the timestamp for the next run lasttimestamp = new date() toisostring(); } catch (error) { logger error('error fetching kubernetes events ', error); } } // main export function that sets up the collector export default (app) void => { // set the collection interval (e g , every minute) const intervalmilliseconds = 60 1000; // schedule periodic execution setinterval(fetchandlogkubernetesevents, intervalmilliseconds); logger info(`kubernetes event collection scheduled to run every ${intervalmilliseconds / 1000} seconds`); // run once immediately at startup fetchandlogkubernetesevents(); }; this implementation defines a function to fetch and log kubernetes events gets the kubernetes client from the server state retrieves the namespace to monitor from configuration queries the kubernetes api for events in that namespace processes and logs each event with relevant details sets up periodic execution using setinterval runs the collection once immediately at startup kubernetes client access the collector accesses the kubernetes api through a client provided by the server state const k8sclient = getstate(serverstate) k8defaultclient; this client is typically initialized during application startup handles authentication to the kubernetes api provides methods for querying different kubernetes resources manages api rate limiting and connection pooling event filtering in a production implementation, the collector would filter events to avoid processing duplicates // conceptual implementation of event filtering const eventsresponse = await k8sclient listnamespacedevent({ namespace, // only get events newer than the last check fieldselector lasttimestamp ? `lasttimestamp>=${lasttimestamp}` undefined }); this filtering reduces the amount of data processed prevents duplicate logging of the same events focuses on new events since the last collection cycle event collection workflow the complete event collection workflow follows these steps sequencediagram participant timer as interval timer participant collector as event collector participant config as application config participant serverstate as server state participant k8sapi as kubernetes api participant logger as application logger timer >>collector trigger collection cycle collector >>serverstate get kubernetes client serverstate >>collector return k8s client collector >>config get namespace setting config >>collector return namespace collector >>k8sapi listnamespacedevent(namespace) k8sapi >>collector return event list loop for each event collector >>collector extract event details collector >>logger log formatted event end collector >>collector update lasttimestamp collector >>timer complete cycle note over timer wait for next interval this diagram illustrates the timer triggers the collection cycle the collector retrieves the kubernetes client and namespace the collector queries the kubernetes api for events the collector processes and logs each event the collector updates the timestamp for the next cycle the cycle completes and waits for the next interval event types and processing the kubernetes event collector handles various types of events pod lifecycle events events related to pod creation, deletion, and status changes info (taskserver\ collect events) k8s event pod my app pod 12345 scheduled successfully assigned my namespace/my app pod 12345 to node 1 info (taskserver\ collect events) k8s event pod my app pod 12345 started started container main info (taskserver\ collect events) k8s event pod my app pod 12345 killing container failed liveness probe deployment events events related to deployment scaling and updates info (taskserver\ collect events) k8s event deployment my app scalingreplicaset scaled up replica set my app abc123 to 3 info (taskserver\ collect events) k8s event deployment my app scalingreplicaset scaled down replica set my app def456 to 0 resource constraint events events related to resource limits and constraints info (taskserver\ collect events) k8s event pod my app pod 12345 failedscheduling 0/3 nodes are available 3 insufficient memory info (taskserver\ collect events) k8s event pod my app pod 12345 oomkilled container was killed due to oom (out of memory) system events events related to system components and infrastructure info (taskserver\ collect events) k8s event node node 1 nodenotready node node 1 status is now nodenotready info (taskserver\ collect events) k8s event node node 1 nodeready node node 1 status is now nodeready advanced implementations while the basic implementation logs events, more sophisticated versions might include additional features event storage storing events in a database for historical analysis // conceptual implementation of event storage async function storeevent(app, event) { await app service('k8s events') create({ kind event involvedobject kind, name event involvedobject name, namespace event involvedobject namespace, reason event reason, message event message, timestamp new date(event lasttimestamp || event firsttimestamp), severity event type // 'normal' or 'warning' }); } alert generation generating alerts for critical events // conceptual implementation of alert generation function shouldgeneratealert(event) { // generate alerts for warnings and specific critical events return ( event type === 'warning' || (event reason === 'failed' && event involvedobject kind === 'pod') || event reason === 'failedscheduling' || event reason === 'oomkilled' ); } async function generatealert(app, event) { if (shouldgeneratealert(event)) { await app service('alerts') create({ source 'kubernetes', level event type === 'warning' ? 'warning' 'info', title `${event involvedobject kind} ${event reason}`, message `${event involvedobject kind} ${event involvedobject name} ${event message}`, timestamp new date(event lasttimestamp || event firsttimestamp) }); } } event aggregation aggregating similar events to reduce noise // conceptual implementation of event aggregation const eventcounts = new map(); function aggregateevents(events) { const aggregated = \[]; for (const event of events) { const key = `${event involvedobject kind} ${event reason}`; if (eventcounts has(key)) { // increment count for this type of event eventcounts set(key, eventcounts get(key) + 1); } else { // first occurrence of this event type eventcounts set(key, 1); aggregated push(event); } } // log the counts for (const \[key, count] of eventcounts entries()) { if (count > 1) { const \[kind, reason] = key split(' '); logger info(`aggregated ${count} similar events for ${kind} with reason ${reason}`); } } return aggregated; } integration with other components the kubernetes event collector integrates with several other components of the background processing system task server application the collector is initialized and managed by the task server application // from src/start ts collectevents(app); this integration provides the collector with the application context ensures the collector starts when the task server starts gives the collector access to the kubernetes client application configuration management the collector uses configuration to determine which namespace to monitor // from src/collect events ts const namespace = config server namespace; this integration allows the target namespace to be configured enables environment specific monitoring supports runtime adjustments without code changes periodic task scheduler the collector uses the periodic task scheduling pattern for regular execution // from src/collect events ts setinterval(fetchandlogkubernetesevents, intervalmilliseconds); this integration ensures regular monitoring of kubernetes events provides consistent operational awareness maintains an up to date view of cluster activities service interaction layer in advanced implementations, the collector might use services to store events or generate alerts // conceptual service interaction await app service('k8s events') create({ / event data / }); await app service('alerts') create({ / alert data / }); this integration enables persistent storage of events allows for alerting and notification supports integration with monitoring dashboards benefits of kubernetes event collection the kubernetes event collector provides several key benefits operational visibility provides insights into cluster activities troubleshooting helps diagnose application issues proactive monitoring identifies potential problems before they escalate historical context maintains a record of system changes resource optimization helps understand scaling patterns and resource usage security awareness captures potential security related events compliance supports audit requirements with event logging these benefits make the kubernetes event collector a valuable component for operating applications in kubernetes environments conclusion the kubernetes event collector completes our exploration of the ir engine's background processing system throughout this documentation, we've examined the task server application that initializes and manages background tasks the application configuration management system that provides operational settings the analytics data collector that gathers platform usage statistics the periodic task scheduler pattern that ensures regular task execution the service interaction layer that enables communication between components the kubernetes event collector that monitors the operational environment together, these components form a comprehensive background processing system that supports the ir engine's operational needs, providing monitoring, data collection, and system awareness